We have a flink job with 7 subtasks. See graph below. This is on flink 1.2.

[cid:image001.png@01D31EC7.FB4B5620]

Here each source task consumes from a kafka topic. Data rate is low around 
70-80 messages per sec. What we are noticing is after running for 2 hours or so 
the source tasks starts showing up back pressure. A thread dump shows the 
following – A bunch of blocked threads like this . Any idea what could be going 
here?

"OutputFlusher" #87 daemon prio=5 os_prio=0 tid=0x0000000000b3b000 nid=0x73 
waiting for monitor entry [0x00007f1d03af9000]
   java.lang.Thread.State: BLOCKED (on object monitor)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:175)
        - waiting to lock <0x000000060076fa80> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:185)

Waiting on

"Time Trigger for Source:stream://com.tesseract.com/snmp_generic/envtemp -> 
FlatMap -> Map (1/1)" #92 daemon prio=5 os_prio=0 tid=0x0000000000c13000 
nid=0x79 in Object.wait() [0x00007f1d032e5000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
        - locked <0x00000006063b5aa8> (a java.util.ArrayDeque)
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        - locked <0x000000060076fa80> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:106)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:681)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:663)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:389)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:681)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:663)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:389)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821)
        at 
org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:142)
        at 
org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256)
        - locked <0x00000006038fc128> (a java.lang.Object)
        at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
        at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)


"Source:stream://com.tesseract.com/metricstream/v1 -> FlatMap -> Map (1/1)" #76 
daemon prio=5 os_prio=0 tid=0x00007f1d240b5000 nid=0x68 in Object.wait() 
[0x00007f1d08b4e000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
        at java.lang.Object.wait(Native Method)
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168)
        - locked <0x0000000602c8daa8> (a java.util.ArrayDeque)
        at 
org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131)
        - locked <0x0000000606572e98> (a 
org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer)
        at 
org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88)
        at 
org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72)
        at 
org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805)
        at 
org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805)
        at 
org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
        at 
com.cisco.ndp.cep.impl.flinksiddhi.source.SourceToTupleFunction.flatMap(SourceToTupleFunction.java:142)
        at 
com.cisco.ndp.cep.impl.flinksiddhi.source.SourceToTupleFunction.flatMap(SourceToTupleFunction.java:40)
        at 
org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422)
        at 
org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827)
        at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805)
        at 
org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84)
        - locked <0x000000060387f028> (a java.lang.Object)
        at 
org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:245)
        - locked <0x000000060387f028> (a java.lang.Object)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.emitRecord(Kafka09Fetcher.java:198)
        at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157)
        at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)



Reply via email to