Zhongyi Sun created FLINK-32826: ----------------------------------- Summary: Our job is stuck on requestMemorySegmentBlocking Key: FLINK-32826 URL: https://issues.apache.org/jira/browse/FLINK-32826 Project: Flink Issue Type: Bug Components: Runtime / Network Affects Versions: 1.16.2, 1.16.0 Reporter: Zhongyi Sun
We have a Flink job and find it often gets stuck on requesting memory segment. {code:java} "shardConsumers-Source: smartad-reject-production[1] -> Calc[2] (1/1)#0-thread-0" Id=107 WAITING on java.util.concurrent.CompletableFuture$Signaller@62e01595 at java.base@11.0.20/jdk.internal.misc.Unsafe.park(Native Method) - waiting on java.util.concurrent.CompletableFuture$Signaller@62e01595 at java.base@11.0.20/java.util.concurrent.locks.LockSupport.park(Unknown Source) at java.base@11.0.20/java.util.concurrent.CompletableFuture$Signaller.block(Unknown Source) at java.base@11.0.20/java.util.concurrent.ForkJoinPool.managedBlock(Unknown Source) at java.base@11.0.20/java.util.concurrent.CompletableFuture.waitingGet(Unknown Source) at java.base@11.0.20/java.util.concurrent.CompletableFuture.get(Unknown Source) at app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:383) at app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:355) at app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414) at app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390) at app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328) at app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161) at app//org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107) at app//org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54) at app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105) at app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91) at app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45) at app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at StreamExecCalc$23.processElement_split2(Unknown Source) at StreamExecCalc$23.processElement(Unknown Source) at app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82) at app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57) at app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56) at app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29) at app//org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:423) at app//org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:528) at app//org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:108) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1028) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:113) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:315) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:332) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:329) at org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1012) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:219) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer$$Lambda$1780/0x0000000840f89440.accept(Unknown Source) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher$$Lambda$1781/0x0000000840f89840.accept(Unknown Source) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:360) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:189) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:169) at org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:124) at org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114) at java.base@11.0.20/java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.base@11.0.20/java.util.concurrent.FutureTask.run(Unknown Source) at java.base@11.0.20/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.base@11.0.20/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.base@11.0.20/java.lang.Thread.run(Unknown Source) {code} We also noticed high backpressure, but we couldn't find the reason. The downstream writer thread was waiting for a message from mailbox. {code:java} "sn-feature-ad3l[3]: Writer (1/1)#0" Id=91 TIMED_WAITING on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929 at java.base@11.0.20/jdk.internal.misc.Unsafe.park(Native Method) - waiting on java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929 at java.base@11.0.20/java.util.concurrent.locks.LockSupport.parkNanos(Unknown Source) at java.base@11.0.20/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown Source) at app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149) at app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:363) at app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352) at app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229) at app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836) {code} Some observations: * The job logic is pretty simple: it consumes AWS Kinesis, does some filtering and writes results to another Kinesis. * The job gets stuck after running for 3-4 days. ** If we restart from last checkpoint, the job will get stuck again soon. ** If we restart without checkpoint, the job will recover, and may be stuck in a few days again. * We have several jobs consuming different Kinesis, but only this one has problem. This Kinesis has only one shard, and data volume is small. * At first we were using 1.16.0, after found some issues like FLINK-29298, FLINK-31293 related to LocalBufferPool, we upgraded to the latest 1.16.2, but the issue was not solved. The heap dump of LocalBufferPool: {code:java} @LocalBufferPool[ LOG=@Log4jLogger[ FQCN=@String[org.apache.logging.slf4j.Log4jLogger], serialVersionUID=@Long[7869000638091304316], EVENT_MARKER=@Log4jMarker[org.apache.logging.slf4j.Log4jMarker@3f47a99], CONVERTER=null, eventLogger=@Boolean[false], logger=@Logger[org.apache.flink.runtime.io.network.buffer.LocalBufferPool:INFO in 4783da3f], name=@String[org.apache.flink.runtime.io.network.buffer.LocalBufferPool], ], UNKNOWN_CHANNEL=@Integer[-1], networkBufferPool=@NetworkBufferPool[ UNBOUNDED_POOL_SIZE=@Integer[2147483647], USAGE_WARNING_THRESHOLD=@Integer[100], LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@16f3a390], totalNumberOfMemorySegments=@Integer[5079], memorySegmentSize=@Integer[32768], availableMemorySegments=@ArrayDeque[isEmpty=false;size=5068], isDestroyed=@Boolean[false], factoryLock=@Object[java.lang.Object@5151b0a4], allBufferPools=@HashSet[isEmpty=false;size=2], resizableBufferPools=@HashSet[isEmpty=false;size=2], numTotalRequiredBuffers=@Integer[3], requestSegmentsTimeout=@Duration[PT30S], availabilityHelper=@AvailabilityHelper[AVAILABLE], lastCheckedUsage=@Integer[0], $assertionsDisabled=@Boolean[true], ], numberOfRequiredMemorySegments=@Integer[2], availableMemorySegments=@ArrayDeque[isEmpty=true;size=0], registeredListeners=@ArrayDeque[isEmpty=true;size=0], maxNumberOfMemorySegments=@Integer[10], currentPoolSize=@Integer[10], numberOfRequestedMemorySegments=@Integer[10], maxBuffersPerChannel=@Integer[10], subpartitionBuffersCount=@int[][ @Integer[10], ], subpartitionBufferRecyclers=@BufferRecycler[][ @SubpartitionBufferRecycler[org.apache.flink.runtime.io.network.buffer.LocalBufferPool$SubpartitionBufferRecycler@41b3ff09], ], unavailableSubpartitionsCount=@Integer[1], maxOverdraftBuffersPerGate=@Integer[0], isDestroyed=@Boolean[false], availabilityHelper=@AvailabilityHelper[ availableFuture=@CompletableFuture[java.util.concurrent.CompletableFuture@4570da85[Not completed, 1 dependents]], ], requestingNotificationOfGlobalPoolAvailable=@Boolean[false], $assertionsDisabled=@Boolean[true], ] {code} Could you help give some clues on how to troubleshoot such problem? Or if you need more information, please let me know, thank you! -- This message was sent by Atlassian Jira (v8.20.10#820010)