Greg Hogan created FLINK-2685: --------------------------------- Summary: TaskManager deadlock on NetworkBufferPool Key: FLINK-2685 URL: https://issues.apache.org/jira/browse/FLINK-2685 Project: Flink Issue Type: Bug Components: Distributed Runtime Affects Versions: master Reporter: Greg Hogan
This deadlock occurs intermittently. I have a {join} followed by a {chain<join,filter>} followed by a {reduceGroup}. Stack traces and local variables from one each of the {join} threads below. The {join}s are waiting on a buffer to become available ({networkBufferPool.availableMemorySegments.count=0}). Both {LocalBufferPool}s have been given extra capacity ({currentPoolSize=60 > numberOfRequiredMemorySegments=32}). The first {join} is at full capacity ({currentPoolSize=numberOfRequestedMemorySegments=60}) yet the second {join} has not acquired any ({numberOfRequestedMemorySegments=0}). {LocalBufferPool.returnExcessMemorySegments} only recycles {MemorySegment}s from its {availableMemorySegments}, so any requested {Buffer}s will only be released when explicitly recycled. First join stack trace and variable values from {LocalBufferPool.requestBuffer}: {noformat} owns: SpanningRecordSerializer<T> (id=723) waiting for: ArrayDeque<E> (id=724) Object.wait(long) line: not available [native method] LocalBufferPool.requestBuffer(boolean) line: 163 LocalBufferPool.requestBufferBlocking() line: 133 RecordWriter<T>.emit(T) line: 92 OutputCollector<T>.collect(T) line: 65 JoinOperator$ProjectFlatJoinFunction<T1,T2,R>.join(T1, T2, Collector<R>) line: 1088 ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 137 JoinDriver<IT1,IT2,OT>.run() line: 208 RegularPactTask<S,OT>.run() line: 489 RegularPactTask<S,OT>.invoke() line: 354 Task.run() line: 581 Thread.run() line: 745 {noformat} {noformat} this LocalBufferPool (id=403) availableMemorySegments ArrayDeque<E> (id=398) elements Object[16] (id=422) head 14 tail 14 currentPoolSize 60 isDestroyed false networkBufferPool NetworkBufferPool (id=354) allBufferPools HashSet<E> (id=424) availableMemorySegments ArrayBlockingQueue<E> (id=427) count 0 items Object[10240] (id=674) itrs null lock ReentrantLock (id=675) notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678) notFull AbstractQueuedSynchronizer$ConditionObject (id=679) putIndex 6954 takeIndex 6954 factoryLock Object (id=430) isDestroyed false managedBufferPools HashSet<E> (id=431) memorySegmentSize 32768 numTotalRequiredBuffers 3226 totalNumberOfMemorySegments 10240 numberOfRequestedMemorySegments 60 numberOfRequiredMemorySegments 32 owner null registeredListeners ArrayDeque<E> (id=421) elements Object[16] (id=685) head 0 tail 0 askToRecycle false isBlocking true {noformat} Second join stack trace and variable values from {SingleInputGate.getNextBufferOrEvent}: {noformat} Unsafe.park(boolean, long) line: not available [native method] LockSupport.parkNanos(Object, long) line: 215 AbstractQueuedSynchronizer$ConditionObject.awaitNanos(long) line: 2078 LinkedBlockingQueue<E>.poll(long, TimeUnit) line: 467 SingleInputGate.getNextBufferOrEvent() line: 414 MutableRecordReader<T>(AbstractRecordReader<T>).getNextRecord(T) line: 79 MutableRecordReader<T>.next(T) line: 34 ReaderIterator<T>.next(T) line: 59 MutableHashTable$ProbeIterator<PT>.next() line: 1581 MutableHashTable<BT,PT>.processProbeIter() line: 457 MutableHashTable<BT,PT>.nextRecord() line: 555 ReusingBuildSecondHashMatchIterator<V1,V2,O>.callWithNextKey(FlatJoinFunction<V1,V2,O>, Collector<O>) line: 110 JoinDriver<IT1,IT2,OT>.run() line: 208 RegularPactTask<S,OT>.run() line: 489 RegularPactTask<S,OT>.invoke() line: 354 Task.run() line: 581 Thread.run() line: 745 {noformat} {noformat} this SingleInputGate (id=693) bufferPool LocalBufferPool (id=706) availableMemorySegments ArrayDeque<E> (id=716) elements Object[16] (id=717) head 0 tail 0 currentPoolSize 60 isDestroyed false networkBufferPool NetworkBufferPool (id=354) allBufferPools HashSet<E> (id=424) availableMemorySegments ArrayBlockingQueue<E> (id=427) count 0 items Object[10240] (id=674) itrs null lock ReentrantLock (id=675) notEmpty AbstractQueuedSynchronizer$ConditionObject (id=678) notFull AbstractQueuedSynchronizer$ConditionObject (id=679) putIndex 6954 takeIndex 6954 factoryLock Object (id=430) isDestroyed false managedBufferPools HashSet<E> (id=431) memorySegmentSize 32768 numTotalRequiredBuffers 3226 totalNumberOfMemorySegments 10240 numberOfRequestedMemorySegments 0 numberOfRequiredMemorySegments 32 owner null registeredListeners ArrayDeque<E> (id=718) channelsWithEndOfPartitionEvents BitSet (id=707) consumedResultId IntermediateDataSetID (id=708) consumedSubpartitionIndex 24 executionId ExecutionAttemptID (id=709) hasReceivedAllEndOfPartitionEvents false inputChannels HashMap<K,V> (id=710) inputChannelsWithData LinkedBlockingQueue<E> (id=692) capacity 2147483647 count AtomicInteger (id=698) value 0 head LinkedBlockingQueue$Node<E> (id=701) last LinkedBlockingQueue$Node<E> (id=701) notEmpty AbstractQueuedSynchronizer$ConditionObject (id=691) notFull AbstractQueuedSynchronizer$ConditionObject (id=703) putLock ReentrantLock (id=704) takeLock ReentrantLock (id=705) isReleased false jobId JobID (id=711) numberOfInputChannels 32 numberOfUninitializedChannels 0 owningTaskName "Join (25/32) (d88748c8d07d430a85bec52cb82c0214)" (id=712) partitionStateChecker NetworkEnvironment$JobManagerPartitionStateChecker (id=363) pendingEvents ArrayList<E> (id=713) registeredListeners CopyOnWriteArrayList<E> (id=714) requestedPartitionsFlag true requestLock Object (id=715) retriggerLocalRequestTimer null currentChannel null {noformat} -- This message was sent by Atlassian JIRA (v6.3.4#6332)