Zhongyi Sun created FLINK-32826:
-----------------------------------

             Summary: Our job is stuck on requestMemorySegmentBlocking
                 Key: FLINK-32826
                 URL: https://issues.apache.org/jira/browse/FLINK-32826
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Network
    Affects Versions: 1.16.2, 1.16.0
            Reporter: Zhongyi Sun


We have a Flink job and find it often gets stuck on requesting memory segment.

 
{code:java}
"shardConsumers-Source: smartad-reject-production[1] -> Calc[2] 
(1/1)#0-thread-0" Id=107 WAITING on 
java.util.concurrent.CompletableFuture$Signaller@62e01595
    at java.base@11.0.20/jdk.internal.misc.Unsafe.park(Native Method)
    -  waiting on java.util.concurrent.CompletableFuture$Signaller@62e01595
    at java.base@11.0.20/java.util.concurrent.locks.LockSupport.park(Unknown 
Source)
    at 
java.base@11.0.20/java.util.concurrent.CompletableFuture$Signaller.block(Unknown
 Source)
    at java.base@11.0.20/java.util.concurrent.ForkJoinPool.managedBlock(Unknown 
Source)
    at 
java.base@11.0.20/java.util.concurrent.CompletableFuture.waitingGet(Unknown 
Source)
    at java.base@11.0.20/java.util.concurrent.CompletableFuture.get(Unknown 
Source)
    at 
app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestMemorySegmentBlocking(LocalBufferPool.java:383)
    at 
app//org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBuilderBlocking(LocalBufferPool.java:355)
    at 
app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewBufferBuilderFromPool(BufferWritingResultPartition.java:414)
    at 
app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.requestNewUnicastBufferBuilder(BufferWritingResultPartition.java:390)
    at 
app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.appendUnicastDataForRecordContinuation(BufferWritingResultPartition.java:328)
    at 
app//org.apache.flink.runtime.io.network.partition.BufferWritingResultPartition.emitRecord(BufferWritingResultPartition.java:161)
    at 
app//org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:107)
    at 
app//org.apache.flink.runtime.io.network.api.writer.ChannelSelectorRecordWriter.emit(ChannelSelectorRecordWriter.java:54)
    at 
app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:105)
    at 
app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:91)
    at 
app//org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:45)
    at 
app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at 
app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at StreamExecCalc$23.processElement_split2(Unknown Source)
    at StreamExecCalc$23.processElement(Unknown Source)
    at 
app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
    at 
app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
    at 
app//org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
    at 
app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
    at 
app//org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
    at 
app//org.apache.flink.streaming.api.operators.StreamSourceContexts$ManualWatermarkContext.processAndCollectWithTimestamp(StreamSourceContexts.java:423)
    at 
app//org.apache.flink.streaming.api.operators.StreamSourceContexts$WatermarkContext.collectWithTimestamp(StreamSourceContexts.java:528)
    at 
app//org.apache.flink.streaming.api.operators.StreamSourceContexts$SwitchingOnClose.collectWithTimestamp(StreamSourceContexts.java:108)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1028)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.access$000(KinesisDataFetcher.java:113)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$AsyncKinesisRecordEmitter.emit(KinesisDataFetcher.java:315)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:332)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher$SyncKinesisRecordEmitter$1.put(KinesisDataFetcher.java:329)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher.emitRecordAndUpdateState(KinesisDataFetcher.java:1012)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.deserializeRecordForCollectionAndUpdateState(ShardConsumer.java:219)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.lambda$run$0(ShardConsumer.java:126)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer$$Lambda$1780/0x0000000840f89440.accept(Unknown
 Source)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.lambda$run$0(FanOutRecordPublisher.java:120)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher$$Lambda$1781/0x0000000840f89840.accept(Unknown
 Source)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.consumeAllRecordsFromKinesisShard(FanOutShardSubscriber.java:360)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutShardSubscriber.subscribeToShardAndConsumeRecords(FanOutShardSubscriber.java:189)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.runWithBackoff(FanOutRecordPublisher.java:169)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.publisher.fanout.FanOutRecordPublisher.run(FanOutRecordPublisher.java:124)
    at 
org.apache.flink.streaming.connectors.kinesis.internals.ShardConsumer.run(ShardConsumer.java:114)
    at 
java.base@11.0.20/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
Source)
    at java.base@11.0.20/java.util.concurrent.FutureTask.run(Unknown Source)
    at 
java.base@11.0.20/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
Source)
    at 
java.base@11.0.20/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
Source)
    at java.base@11.0.20/java.lang.Thread.run(Unknown Source) {code}
 

We also noticed high backpressure, but we couldn't find the reason. The 
downstream writer thread was waiting for a message from mailbox.
{code:java}
"sn-feature-ad3l[3]: Writer (1/1)#0" Id=91 TIMED_WAITING on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929   
 at java.base@11.0.20/jdk.internal.misc.Unsafe.park(Native Method)    -  
waiting on 
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject@2072929   
 at java.base@11.0.20/java.util.concurrent.locks.LockSupport.parkNanos(Unknown 
Source)    at 
java.base@11.0.20/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(Unknown
 Source)    at 
app//org.apache.flink.streaming.runtime.tasks.mailbox.TaskMailboxImpl.take(TaskMailboxImpl.java:149)
    at 
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMailsWhenDefaultActionUnavailable(MailboxProcessor.java:363)
    at 
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.processMail(MailboxProcessor.java:352)
    at 
app//org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:229)
    at 
app//org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:836)
 {code}
Some observations:
 * The job logic is pretty simple: it consumes AWS Kinesis, does some filtering 
and writes results to another Kinesis.
 * The job gets stuck after running for 3-4 days.
 ** If we restart from last checkpoint, the job will get stuck again soon.
 ** If we restart without checkpoint, the job will recover, and may be stuck in 
a few days again.
 * We have several jobs consuming different Kinesis, but only this one has 
problem. This Kinesis has only one shard, and data volume is small.
 * At first we were using 1.16.0, after found some issues like FLINK-29298, 
FLINK-31293 related to LocalBufferPool, we upgraded to the latest 1.16.2, but 
the issue was not solved.

The heap dump of LocalBufferPool:

 
{code:java}
@LocalBufferPool[
        LOG=@Log4jLogger[
            FQCN=@String[org.apache.logging.slf4j.Log4jLogger],
            serialVersionUID=@Long[7869000638091304316],
            
EVENT_MARKER=@Log4jMarker[org.apache.logging.slf4j.Log4jMarker@3f47a99],
            CONVERTER=null,
            eventLogger=@Boolean[false],
            
logger=@Logger[org.apache.flink.runtime.io.network.buffer.LocalBufferPool:INFO 
in 4783da3f],
            
name=@String[org.apache.flink.runtime.io.network.buffer.LocalBufferPool],
        ],
        UNKNOWN_CHANNEL=@Integer[-1],
        networkBufferPool=@NetworkBufferPool[
            UNBOUNDED_POOL_SIZE=@Integer[2147483647],
            USAGE_WARNING_THRESHOLD=@Integer[100],
            LOG=@Log4jLogger[org.apache.logging.slf4j.Log4jLogger@16f3a390],
            totalNumberOfMemorySegments=@Integer[5079],
            memorySegmentSize=@Integer[32768],
            availableMemorySegments=@ArrayDeque[isEmpty=false;size=5068],
            isDestroyed=@Boolean[false],
            factoryLock=@Object[java.lang.Object@5151b0a4],
            allBufferPools=@HashSet[isEmpty=false;size=2],
            resizableBufferPools=@HashSet[isEmpty=false;size=2],
            numTotalRequiredBuffers=@Integer[3],
            requestSegmentsTimeout=@Duration[PT30S],
            availabilityHelper=@AvailabilityHelper[AVAILABLE],
            lastCheckedUsage=@Integer[0],
            $assertionsDisabled=@Boolean[true],
        ],
        numberOfRequiredMemorySegments=@Integer[2],
        availableMemorySegments=@ArrayDeque[isEmpty=true;size=0],
        registeredListeners=@ArrayDeque[isEmpty=true;size=0],
        maxNumberOfMemorySegments=@Integer[10],
        currentPoolSize=@Integer[10],
        numberOfRequestedMemorySegments=@Integer[10],
        maxBuffersPerChannel=@Integer[10],
        subpartitionBuffersCount=@int[][
            @Integer[10],
        ],
        subpartitionBufferRecyclers=@BufferRecycler[][
            
@SubpartitionBufferRecycler[org.apache.flink.runtime.io.network.buffer.LocalBufferPool$SubpartitionBufferRecycler@41b3ff09],
        ],
        unavailableSubpartitionsCount=@Integer[1],
        maxOverdraftBuffersPerGate=@Integer[0],
        isDestroyed=@Boolean[false],
        availabilityHelper=@AvailabilityHelper[
            
availableFuture=@CompletableFuture[java.util.concurrent.CompletableFuture@4570da85[Not
 completed, 1 dependents]],
        ],
        requestingNotificationOfGlobalPoolAvailable=@Boolean[false],
        $assertionsDisabled=@Boolean[true],
    ] {code}
Could you help give some clues on how to troubleshoot such problem? Or if you 
need more information, please let me know, thank you!

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to