Thanks,

that makes sense...
Guess I'll try some dirty workaround for now by interrupting consumer thread if it's doesn't finish after some time...

maciek

On 09/03/2016 14:42, Stephan Ewen wrote:
Here is the Jira issue: https://issues.apache.org/jira/browse/FLINK-3595

On Wed, Mar 9, 2016 at 2:06 PM, Stephan Ewen <se...@apache.org <mailto:se...@apache.org>> wrote:

    Hi!

    Thanks for the debugging this, I think there is in fact an issue
    in the 0.9 consumer.

    I'll open a ticket for it, will try to fix that as soon as possible...

    Stephan


    On Wed, Mar 9, 2016 at 1:59 PM, Maciek Próchniak <m...@touk.pl
    <mailto:m...@touk.pl>> wrote:

        Hi,

        from time to time when we cancel streaming jobs (or they are
        failing for some reason) we encounter:

        2016-03-09 10:25:29,799 [Canceler for Source: read objects
        from topic: (...) ' did not react to cancelling signal, but is
        stuck in method:
         java.lang.Object.wait(Native Method)
        java.lang.Thread.join(Thread.java:1253)
        
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09.run(FlinkKafkaConsumer09.java:329)
        
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78)
        
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56)
        
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:224)
        org.apache.flink.runtime.taskmanager.Task.run(Task.java:559)
        java.lang.Thread.run(Thread.java:745)


        Now, relevant stacktrace is this:

"Thread-626104" #1244254 daemon prio=5 os_prio=0 tid=... nid=0x2e96 in Object.wait() [0x00007f2bac847000]
           java.lang.Thread.State: TIMED_WAITING (on object monitor)
                at java.lang.Object.wait(Native Method)
                at
        
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:163)
                - locked <0x000000041ae00180> (a java.util.ArrayDeque)
                at
        
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:133)
                at
        
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:92)
                - locked <0x00000004be0002f0> (a
        
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
                at
        
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
                at
        
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:78)
                at
        
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
                at
        
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
                at
        
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
                at
        
org.apache.flink.streaming.api.scala.DataStream$$anon$6$$anonfun$flatMap$1.apply(DataStream.scala:541)
                at scala.collection.immutable.List.foreach(List.scala:381)
                at
        
org.apache.flink.streaming.api.scala.DataStream$$anon$6.flatMap(DataStream.scala:541)
                at
        
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:48)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
                at
        
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
                at
        
org.apache.flink.streaming.api.operators.StreamFilter.processElement(StreamFilter.java:38)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
                at
        
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
                at
        
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:39)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:351)
                at
        
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:337)
                at
        
org.apache.flink.streaming.api.operators.StreamSource$ManualWatermarkContext.collect(StreamSource.java:318)
                - locked <0x000000041ae001c8> (a java.lang.Object)
                at
        
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09$ConsumerThread.run(FlinkKafkaConsumer09.java:473)
                - locked <0x000000041ae001c8> (a java.lang.Object)

        and also:
        "OutputFlusher" #1244231 daemon prio=5 os_prio=0
        tid=0x00007f2a39d4e800 nid=0x2e7d waiting for monitor entry
        [0x00007f2a3e5e4000]
           java.lang.Thread.State: BLOCKED (on object monitor)
                at
        
org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:172)
                - waiting to lock <0x00000004be0002f0> (a
        
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
                at
        
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:176)


        - looks like source tries to process remaining kafka messages,
        but it is stuck on partitioning by key.

        I managed to take heap dump and look what's going on inside
        LocalBufferPool
        - availableMemorySegments queue is empty
        - numberOfRequestedMemorySegments == currentPoolSize == 16
        - there are not registeredListeners

        Now, it seems that loop in LocalBufferPool#142 without end,
        waiting for buffer recycle - but from what I see it won't
        happen because OutputFlusher
        is blocked by this loop.

        The problem occurs (it seems) when more or less at the same
        time as job cancellation we start new job (e.g. taskmanager is
        restarted, one job is failing because of some problem,
        and another one is just starting) - so I wonder could it be
        some problem with setNumBuffers method - although it looks
        synchronized enough...

        We are using version 1.0.0 (RC4) btw

        I hope to dig further into this - but for now this is all I
        managed to find.

        thanks,
        maciek




Reply via email to