Hi! Thanks for the debugging this, I think there is in fact an issue in the 0.9 consumer.
I'll open a ticket for it, will try to fix that as soon as possible... Stephan On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <m...@touk.pl> wrote: > Hi, > > from time to time when we cancel streaming jobs (or they are failing for > some reason) we encounter: > > 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic: > (...) ' did not react to cancelling signal, but is stuck in method: > java.lang.Object.wait(Native Method) > java.lang.Thread.join(Thread.java:1253) > > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329) > > org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) > > org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) > > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) > org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) > java.lang.Thread.run(Thread.java:745) > > > Now, relevant stacktrace is this: > > "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... nid=0x2e96 in > Object.wait() [0x00007f2bac847000] > java.lang.Thread.State: TIMED_WAITING (on object monitor) > at java.lang.Object.wait(Native Method) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163) > - locked <0x000000041ae00180> (a java.util.ArrayDeque) > at > org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) > - locked <0x00000004be0002f0> (a > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) > at > org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) > at > org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541) > at scala.collection.immutable.List.foreach(List.scala:381) > at > org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541) > at > org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) > at > org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) > at > org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) > - locked <0x000000041ae001c8> (a java.lang.Object) > at > org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) > - locked <0x000000041ae001c8> (a java.lang.Object) > > and also: > "OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x00007f2a39d4e800 > nid=0x2e7d waiting for monitor entry [0x00007f2a3e5e4000] > java.lang.Thread.State: BLOCKED (on object monitor) > at > org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172) > - waiting to lock <0x00000004be0002f0> (a > org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) > at > org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176) > > > - looks like source tries to process remaining kafka messages, but it is > stuck on partitioning by key. > > I managed to take heap dump and look what's going on inside LocalBufferPool > - availableMemorySegments queue is empty > - numberOfRequestedMemorySegments == currentPoolSize == 16 > - there are not registeredListeners > > Now, it seems that loop in LocalBufferPool#142 without end, waiting for > buffer recycle - but from what I see it won't happen because OutputFlusher > is blocked by this loop. > > The problem occurs (it seems) when more or less at the same time as job > cancellation we start new job (e.g. taskmanager is restarted, one job is > failing because of some problem, > and another one is just starting) - so I wonder could it be some problem > with setNumBuffers method - although it looks synchronized enough... > > We are using version 1.0.0 (RC4) btw > > I hope to dig further into this - but for now this is all I managed to > find. > > thanks, > maciek >