Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595
On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org> wrote: > Hi! > > Thanks for the debugging this, I think there is in fact an issue in the > 0.9 consumer. > > I'll open a ticket for it, will try to fix that as soon as possible... > > Stephan > > > On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <m...@touk.pl> wrote: > >> Hi, >> >> from time to time when we cancel streaming jobs (or they are failing for >> some reason) we encounter: >> >> 2016-03-09 10:25:29,799 [Canceler for Source: read objects from topic: >> (...) ' did not react to cancelling signal, but is stuck in method: >> java.lang.Object.wait(Native Method) >> java.lang.Thread.join(Thread.java:1253) >> >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329) >> >> org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) >> >> org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) >> >> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224) >> org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) >> java.lang.Thread.run(Thread.java:745) >> >> >> Now, relevant stacktrace is this: >> >> "Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... nid=0x2e96 in >> Object.wait() [0x00007f2bac847000] >> java.lang.Thread.State: TIMED_WAITING (on object monitor) >> at java.lang.Object.wait(Native Method) >> at >> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163) >> - locked <0x000000041ae00180> (a java.util.ArrayDeque) >> at >> org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133) >> at >> org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92) >> - locked <0x00000004be0002f0> (a >> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) >> at >> org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) >> at >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78) >> at >> org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) >> at >> org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) >> at >> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541) >> at >> org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541) >> at scala.collection.immutable.List.foreach(List.scala:381) >> at >> org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541) >> at >> org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >> at >> org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >> at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >> at >> org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351) >> at >> org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337) >> at >> org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318) >> - locked <0x000000041ae001c8> (a java.lang.Object) >> at >> org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473) >> - locked <0x000000041ae001c8> (a java.lang.Object) >> >> and also: >> "OutputFlusher" #1244231 daemon prio=5 os_prio=0 tid=0x00007f2a39d4e800 >> nid=0x2e7d waiting for monitor entry [0x00007f2a3e5e4000] >> java.lang.Thread.State: BLOCKED (on object monitor) >> at >> org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172) >> - waiting to lock <0x00000004be0002f0> (a >> org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) >> at >> org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176) >> >> >> - looks like source tries to process remaining kafka messages, but it is >> stuck on partitioning by key. >> >> I managed to take heap dump and look what's going on inside >> LocalBufferPool >> - availableMemorySegments queue is empty >> - numberOfRequestedMemorySegments == currentPoolSize == 16 >> - there are not registeredListeners >> >> Now, it seems that loop in LocalBufferPool#142 without end, waiting for >> buffer recycle - but from what I see it won't happen because OutputFlusher >> is blocked by this loop. >> >> The problem occurs (it seems) when more or less at the same time as job >> cancellation we start new job (e.g. taskmanager is restarted, one job is >> failing because of some problem, >> and another one is just starting) - so I wonder could it be some problem >> with setNumBuffers method - although it looks synchronized enough... >> >> We are using version 1.0.0 (RC4) btw >> >> I hope to dig further into this - but for now this is all I managed to >> find. >> >> thanks, >> maciek >> > >