Greg Hogan created FLINK-2685:
---------------------------------

             Summary: TaskManager deadlock on NetworkBufferPool
                 Key: FLINK-2685
                 URL: https://issues.apache.org/jira/browse/FLINK-2685
             Project: Flink
          Issue Type: Bug
          Components: Distributed Runtime
    Affects Versions: master
            Reporter: Greg Hogan


This deadlock occurs intermittently. I have a {join} followed by a 
{chain<join,filter>} followed by a {reduceGroup}. Stack traces and local 
variables from one each of the {join} threads below.

The {join}s are waiting on a buffer to become available 
({networkBufferPool.availableMemorySegments.count=0}). Both {LocalBufferPool}s 
have been given extra capacity ({currentPoolSize=60 > 
numberOfRequiredMemorySegments=32}). The first {join} is at full capacity 
({currentPoolSize=numberOfRequestedMemorySegments=60}) yet the second {join} 
has not acquired any ({numberOfRequestedMemorySegments=0}).

{LocalBufferPool.returnExcessMemorySegments} only recycles {MemorySegment}s 
from its {availableMemorySegments}, so any requested {Buffer}s will only be 
released when explicitly recycled.

First join stack trace and variable values from {LocalBufferPool.requestBuffer}:
{noformat}
owns: SpanningRecordSerializer<T>  (id=723)     
waiting for: ArrayDeque<E>  (id=724)    
Object.wait(long) line: not available [native method]   
LocalBufferPool.requestBuffer(boolean) line: 163        
LocalBufferPool.requestBufferBlocking() line: 133       
RecordWriter<T>.emit(T) line: 92        
OutputCollector<T>.collect(T) line: 65  
JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 
1088     
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
 Collector<O>) line: 137 
JoinDriver<IT1,IT2,OT>.run() line: 208  
RegularPactTask<S,OT>.run() line: 489   
RegularPactTask<S,OT>.invoke() line: 354        
Task.run() line: 581    
Thread.run() line: 745  
{noformat}

{noformat}
this    LocalBufferPool  (id=403)       
        availableMemorySegments ArrayDeque<E>  (id=398) 
                elements        Object[16]  (id=422)    
                head    14      
                tail    14      
        currentPoolSize 60      
        isDestroyed     false   
        networkBufferPool       NetworkBufferPool  (id=354)     
                allBufferPools  HashSet<E>  (id=424)    
                availableMemorySegments ArrayBlockingQueue<E>  (id=427) 
                        count   0       
                        items   Object[10240]  (id=674) 
                        itrs    null    
                        lock    ReentrantLock  (id=675) 
                        notEmpty        
AbstractQueuedSynchronizer$ConditionObject  (id=678)    
                        notFull AbstractQueuedSynchronizer$ConditionObject  
(id=679)    
                        putIndex        6954    
                        takeIndex       6954    
                factoryLock     Object  (id=430)        
                isDestroyed     false   
                managedBufferPools      HashSet<E>  (id=431)    
                memorySegmentSize       32768   
                numTotalRequiredBuffers 3226    
                totalNumberOfMemorySegments     10240   
        numberOfRequestedMemorySegments 60      
        numberOfRequiredMemorySegments  32      
        owner   null    
        registeredListeners     ArrayDeque<E>  (id=421) 
                elements        Object[16]  (id=685)    
                head    0       
                tail    0       
askToRecycle    false   
isBlocking      true    
{noformat}

Second join stack trace and variable values from 
{SingleInputGate.getNextBufferOrEvent}:
{noformat}
Unsafe.park(boolean, long) line: not available [native method]  
LockSupport.parkNanos(Object, long) line: 215   
AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078  
LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467   
SingleInputGate.getNextBufferOrEvent() line: 414        
MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79       
MutableRecordReader<T>.next(T) line: 34 
ReaderIterator<T>.next(T) line: 59      
MutableHashTable$ProbeIterator<PT>.next() line: 1581    
MutableHashTable<BT,PT>.processProbeIter() line: 457    
MutableHashTable<BT,PT>.nextRecord() line: 555  
ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>,
 Collector<O>) line: 110 
JoinDriver<IT1,IT2,OT>.run() line: 208  
RegularPactTask<S,OT>.run() line: 489   
RegularPactTask<S,OT>.invoke() line: 354        
Task.run() line: 581    
Thread.run() line: 745  
{noformat}

{noformat}
this    SingleInputGate  (id=693)       
        bufferPool      LocalBufferPool  (id=706)       
                availableMemorySegments ArrayDeque<E>  (id=716) 
                        elements        Object[16]  (id=717)    
                        head    0       
                        tail    0       
                currentPoolSize 60      
                isDestroyed     false   
                networkBufferPool       NetworkBufferPool  (id=354)     
                        allBufferPools  HashSet<E>  (id=424)    
                        availableMemorySegments ArrayBlockingQueue<E>  (id=427) 
                                count   0       
                                items   Object[10240]  (id=674) 
                                itrs    null    
                                lock    ReentrantLock  (id=675) 
                                notEmpty        
AbstractQueuedSynchronizer$ConditionObject  (id=678)    
                                notFull 
AbstractQueuedSynchronizer$ConditionObject  (id=679)    
                                putIndex        6954    
                                takeIndex       6954    
                        factoryLock     Object  (id=430)        
                        isDestroyed     false   
                        managedBufferPools      HashSet<E>  (id=431)    
                        memorySegmentSize       32768   
                        numTotalRequiredBuffers 3226    
                        totalNumberOfMemorySegments     10240   
                numberOfRequestedMemorySegments 0       
                numberOfRequiredMemorySegments  32      
                owner   null    
                registeredListeners     ArrayDeque<E>  (id=718) 
        channelsWithEndOfPartitionEvents        BitSet  (id=707)        
        consumedResultId        IntermediateDataSetID  (id=708) 
        consumedSubpartitionIndex       24      
        executionId     ExecutionAttemptID  (id=709)    
        hasReceivedAllEndOfPartitionEvents      false   
        inputChannels   HashMap<K,V>  (id=710)  
        inputChannelsWithData   LinkedBlockingQueue<E>  (id=692)        
                capacity        2147483647      
                count   AtomicInteger  (id=698) 
                        value   0       
                head    LinkedBlockingQueue$Node<E>  (id=701)   
                last    LinkedBlockingQueue$Node<E>  (id=701)   
                notEmpty        AbstractQueuedSynchronizer$ConditionObject  
(id=691)    
                notFull AbstractQueuedSynchronizer$ConditionObject  (id=703)    
                putLock ReentrantLock  (id=704) 
                takeLock        ReentrantLock  (id=705) 
        isReleased      false   
        jobId   JobID  (id=711) 
        numberOfInputChannels   32      
        numberOfUninitializedChannels   0       
        owningTaskName  "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" 
(id=712)      
        partitionStateChecker   
NetworkEnvironment$JobManagerPartitionStateChecker  (id=363)    
        pendingEvents   ArrayList<E>  (id=713)  
        registeredListeners     CopyOnWriteArrayList<E>  (id=714)       
        requestedPartitionsFlag true    
        requestLock     Object  (id=715)        
        retriggerLocalRequestTimer      null    
currentChannel  null    
{noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to