Flink version is 1.5.3/Hadoop 27
_____________________________________________
From: Narayanaswamy, Krishna [Tech]
Sent: Wednesday, October 03, 2018 3:42 PM
To: 'user@flink.apache.org' <user@flink.apache.org>
Subject: Memory Allocate/Deallocate related Thread Deadlock encountered when 
running a large job > 10k tasks


Hi,

I am trying to run one large single job graph which has > 10k tasks. The form 
of the graph is something like
DataSource -> Filter -> Map [...multiple]
==>     Sink1
==>     Sink2
I am using a parallelism of 10 with 1 slot per task manager and a memory 
allocation of 32G per TM. The JM is running with 8G.

Everything starts up and runs fine with close to 6-7k tasks (this is variable 
and is mostly the source /filter/map portions) completing and then the graph 
just hangs.  I managed to connect to the task managers and get a thread dump 
just in time and found the following deadlock on one of the TMs which 
apparently seems to be holding up everything else.
Please could someone take a look and advise if there is something I could do or 
try out to fix this.

Marked below are the 2 isolated thread stacks marking the deadlock -

Thread-1
"DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002" prio=5 tid=0x3e2 nid=NA 
waiting for monitor entry
         waiting for Map (Key Extractor) (1/10)@9967 to release lock on 
<0x2dfb> (a java.util.ArrayDeque)
          at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.releaseMemory(SpillableSubpartition.java:223)
          at 
org.apache.flink.runtime.io.network.partition.ResultPartition.releaseMemory(ResultPartition.java:373)
          at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.setNumBuffers(LocalBufferPool.java:355)
          - locked <0x2dfd> (a java.util.ArrayDeque)
          at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.redistributeBuffers(NetworkBufferPool.java:402)
          at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:203)
          - locked <0x2da5> (a java.lang.Object)
          at 
org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.recycleMemorySegments(NetworkBufferPool.java:193)
          at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.returnExclusiveSegments(SingleInputGate.java:318)
          at 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel.releaseAllResources(RemoteInputChannel.java:259)
          at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.getNextBufferOrEvent(SingleInputGate.java:578)
          at 
org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate.pollNextBufferOrEvent(SingleInputGate.java:507)
          at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.waitAndGetNextInputGate(UnionInputGate.java:213)
          at 
org.apache.flink.runtime.io.network.partition.consumer.UnionInputGate.getNextBufferOrEvent(UnionInputGate.java:163)
          at 
org.apache.flink.runtime.io.network.api.reader.AbstractRecordReader.getNextRecord(AbstractRecordReader.java:86)
          at 
org.apache.flink.runtime.io.network.api.reader.MutableRecordReader.next(MutableRecordReader.java:47)
          at 
org.apache.flink.runtime.operators.util.ReaderIterator.next(ReaderIterator.java:73)
          at 
org.apache.flink.runtime.operators.DataSinkTask.invoke(DataSinkTask.java:216)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)


Thread-2
"Map (Key Extractor) (1/10)@9967" prio=5 tid=0xaab nid=NA waiting for monitor 
entry
  java.lang.Thread.State: BLOCKED
         blocks DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002
         waiting for DataSink (DATA#HadoopFileOutputFormat ) (1/2)@11002 to 
release lock on <0x2dfd> (a java.util.ArrayDeque)
          at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.recycle(LocalBufferPool.java:261)
          at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.deallocate(NetworkBuffer.java:171)
          at 
org.apache.flink.shaded.netty4.io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:106)
          at 
org.apache.flink.runtime.io.network.buffer.NetworkBuffer.recycleBuffer(NetworkBuffer.java:146)
          at 
org.apache.flink.runtime.io.network.buffer.BufferConsumer.close(BufferConsumer.java:110)
          at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.spillFinishedBufferConsumers(SpillableSubpartition.java:271)
          at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:117)
          - locked <0x2dfb> (a java.util.ArrayDeque)
          at 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition.add(SpillableSubpartition.java:96)
          - locked <0x2dfc> (a 
org.apache.flink.runtime.io.network.partition.SpillableSubpartition)
          at 
org.apache.flink.runtime.io.network.partition.ResultPartition.addBufferConsumer(ResultPartition.java:255)
          at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.requestNewBufferBuilder(RecordWriter.java:211)
          at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:142)
          at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:105)
          at 
org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65)
          at 
org.apache.flink.runtime.operators.util.metrics.CountingCollector.collect(CountingCollector.java:35)
          at 
org.apache.flink.runtime.operators.MapDriver.run(MapDriver.java:103)
          at 
org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:503)
          at 
org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
          at org.apache.flink.runtime.taskmanager.Task.run(Task.java:703)
          at java.lang.Thread.run(Thread.java:745)

Thanks,
Krishna.



  ________________________________

Your Personal Data: We may collect and process information about you that may 
be subject to data protection laws. For more information about how we use and 
disclose your personal data, how we protect your information, our legal basis 
to use your information, your rights and who you can contact, please refer to: 
www.gs.com/privacy-notices<http://www.gs.com/privacy-notices>

Reply via email to