This is definitely a bug on Kafka side, because they're not handling uncaught exceptions properly [1]. I don't think there is much we can do on the Flink side here, because we're not able to override factory for the Kafka IO thread :/
[1] https://issues.apache.org/jira/browse/KAFKA-4228 On Tue, Nov 9, 2021 at 7:38 AM Dongwon Kim <eastcirc...@gmail.com> wrote: > Hi David, > > There are currently no metrics for the async work-queue size (you should >> be able to see the queue stats with debug logs enabled though [1]). > > Thanks for the input but scraping DEBUG messages into, for example, > ElasticSearch for monitoring on Grafana is not possible in my current > environment. > I just defined two counters in RichAsyncFunction for tracking # sent > requests and # finished/failed requests, respectively, and used the two > counters to calculate the inflight requests from Prometheus. > > As far as I can tell from looking at the code, the async operator is able >> to checkpoint even if the work-queue is exhausted. > > Oh, I didn't know that! As you pointed out and I'm going to explain below, > the async operator might not be the source of the problem. > > I just hit the same situation and found that > - # of inflight records are zero when the backpressure is getting high > - A taskmanager complains the following error message around the time when > the backpressure is getting high (all the others don't do): > >> 2021-11-09 13:20:40,601 ERROR org.apache.kafka.common.utils.KafkaThread >> [] - Uncaught exception in thread >> 'kafka-producer-network-thread | producer-8': >> >> java.lang.OutOfMemoryError: Direct buffer memory >> >> at java.nio.Bits.reserveMemory(Bits.java:175) ~[?:?] >> >> at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118) >> ~[?:?] >> >> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317) ~[?:?] >> >> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:242) ~[?:?] >> >> at sun.nio.ch.IOUtil.write(IOUtil.java:164) ~[?:?] >> >> at sun.nio.ch.IOUtil.write(IOUtil.java:130) ~[?:?] >> >> at >> sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:493) ~[?:?] >> >> at java.nio.channels.SocketChannel.write(SocketChannel.java:507) >> ~[?:?] >> >> at >> org.apache.kafka.common.network.PlaintextTransportLayer.write(PlaintextTransportLayer.java:152) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at >> org.apache.kafka.common.network.ByteBufferSend.writeTo(ByteBufferSend.java:60) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at >> org.apache.kafka.common.network.KafkaChannel.send(KafkaChannel.java:429) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at >> org.apache.kafka.common.network.KafkaChannel.write(KafkaChannel.java:399) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at >> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:589) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at >> org.apache.kafka.common.network.Selector.poll(Selector.java:483) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at >> org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at >> org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:335) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at >> org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244) >> ~[drivinghabit-stream-calculator-2.0-SNAPSHOT.jar:?] >> >> at java.lang.Thread.run(Thread.java:829) [?:?] >> > > Can it be the reason why my pipeline is stalled and ends up with the > checkout timeout? I guess all the upstream tasks might fail to send data to > the failed kafka producer and records are stacking up in buffers, which > could result in the back-pressure. If so, is there no mechanism in Flink to > detect such an error and send it to the job manager for debugging purposes? > > Best, > > Dongwon > > > On Mon, Nov 8, 2021 at 9:21 PM David Morávek <d...@apache.org> wrote: > >> Hi Dongwon, >> >> There are currently no metrics for the async work-queue size (you should >> be able to see the queue stats with debug logs enabled though [1]). As far >> as I can tell from looking at the code, the async operator is able to >> checkpoint even if the work-queue is exhausted. >> >> Arvid can you please validate the above? (the checkpoints not being >> blocked by the work queue part) >> >> [1] >> https://github.com/apache/flink/blob/release-1.14.0/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java#L109 >> >> Best, >> D. >> >> On Sun, Nov 7, 2021 at 10:41 AM Dongwon Kim <eastcirc...@gmail.com> >> wrote: >> >>> Hi community, >>> >>> While using Flink's async i/o for interacting with an external system, I >>> got the following exception: >>> >>> 2021-11-06 10:38:35,270 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - >>> Triggering checkpoint 54 (type=CHECKPOINT) @ 1636162715262 for job >>> f168a44ea33198cd71783824d49f9554. >>> 2021-11-06 10:38:47,031 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - Completed >>> checkpoint 54 for job f168a44ea33198cd71783824d49f9554 (11930992707 bytes, >>> checkpointDuration=11722 ms, finalizationTime=47 ms). >>> 2021-11-06 10:58:35,270 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - >>> Triggering checkpoint 55 (type=CHECKPOINT) @ 1636163915262 for job >>> f168a44ea33198cd71783824d49f9554. >>> 2021-11-06 11:08:35,271 INFO >>> org.apache.flink.runtime.checkpoint.CheckpointCoordinator [] - >>> Checkpoint 55 of job f168a44ea33198cd71783824d49f9554 expired before >>> completing. >>> 2021-11-06 11:08:35,287 INFO org.apache.flink.runtime.jobmaster.JobMaster >>> [] - Trying to recover from a global failure. >>> >>> >>> - FYI, I'm using 1.14.0 and enabled unaligned checkpointing and buffer >>> debloating >>> - the 55th ckpt failed to complete within 10 mins (which is the value of >>> execution.checkpointing.timeout) >>> - the below graph shows that backpressure skyrocketed around the time >>> the 55th ckpt began >>> [image: image.png] >>> >>> What I suspect is the capacity of the asynchronous operation because >>> limiting the value can cause back-pressure once the capacity is exhausted >>> [1]. >>> >>> Although I could increase the value, I want to monitor the current >>> in-flight async i/o requests like the above back-pressure graph on Grafana. >>> [2] does not introduce any system metric specific to async i/o. >>> >>> Best, >>> >>> Dongwon >>> >>> [1] >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/datastream/operators/asyncio/#async-io-api >>> [2] >>> https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/metrics/#system-metrics >>> >>> >>>