We have a flink job with 7 subtasks. See graph below. This is on flink 1.2.
[cid:image001.png@01D31EC7.FB4B5620] Here each source task consumes from a kafka topic. Data rate is low around 70-80 messages per sec. What we are noticing is after running for 2 hours or so the source tasks starts showing up back pressure. A thread dump shows the following – A bunch of blocked threads like this . Any idea what could be going here? "OutputFlusher" #87 daemon prio=5 os_prio=0 tid=0x0000000000b3b000 nid=0x73 waiting for monitor entry [0x00007f1d03af9000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.flush(RecordWriter.java:175) - waiting to lock <0x000000060076fa80> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) at org.apache.flink.streaming.runtime.io.StreamRecordWriter$OutputFlusher.run(StreamRecordWriter.java:185) Waiting on "Time Trigger for Source:stream://com.tesseract.com/snmp_generic/envtemp -> FlatMap -> Map (1/1)" #92 daemon prio=5 os_prio=0 tid=0x0000000000c13000 nid=0x79 in Object.wait() [0x00007f1d032e5000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168) - locked <0x00000006063b5aa8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) - locked <0x000000060076fa80> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.randomEmit(RecordWriter.java:106) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.randomEmit(StreamRecordWriter.java:104) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.emitLatencyMarker(RecordWriterOutput.java:96) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:681) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:663) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:389) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.reportOrForwardLatencyMarker(AbstractStreamOperator.java:681) at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processLatencyMarker(AbstractStreamOperator.java:663) at org.apache.flink.streaming.runtime.tasks.OperatorChain$ChainingOutput.emitLatencyMarker(OperatorChain.java:389) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.emitLatencyMarker(AbstractStreamOperator.java:821) at org.apache.flink.streaming.api.operators.StreamSource$LatencyMarksEmitter$1.onProcessingTime(StreamSource.java:142) at org.apache.flink.streaming.runtime.tasks.SystemProcessingTimeService$RepeatedTriggerTask.run(SystemProcessingTimeService.java:256) - locked <0x00000006038fc128> (a java.lang.Object) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) "Source:stream://com.tesseract.com/metricstream/v1 -> FlatMap -> Map (1/1)" #76 daemon prio=5 os_prio=0 tid=0x00007f1d240b5000 nid=0x68 in Object.wait() [0x00007f1d08b4e000] java.lang.Thread.State: TIMED_WAITING (on object monitor) at java.lang.Object.wait(Native Method) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBuffer(LocalBufferPool.java:168) - locked <0x0000000602c8daa8> (a java.util.ArrayDeque) at org.apache.flink.runtime.io.network.buffer.LocalBufferPool.requestBufferBlocking(LocalBufferPool.java:138) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.sendToTarget(RecordWriter.java:131) - locked <0x0000000606572e98> (a org.apache.flink.runtime.io.network.api.serialization.SpanningRecordSerializer) at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:88) at org.apache.flink.streaming.runtime.io.StreamRecordWriter.emit(StreamRecordWriter.java:86) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:72) at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:39) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805) at org.apache.flink.streaming.api.operators.StreamMap.processElement(StreamMap.java:38) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805) at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51) at com.cisco.ndp.cep.impl.flinksiddhi.source.SourceToTupleFunction.flatMap(SourceToTupleFunction.java:142) at com.cisco.ndp.cep.impl.flinksiddhi.source.SourceToTupleFunction.flatMap(SourceToTupleFunction.java:40) at org.apache.flink.streaming.api.operators.StreamFlatMap.processElement(StreamFlatMap.java:47) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:422) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:407) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:827) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:805) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:84) - locked <0x000000060387f028> (a java.lang.Object) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecord(AbstractFetcher.java:245) - locked <0x000000060387f028> (a java.lang.Object) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.emitRecord(Kafka09Fetcher.java:198) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:157) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256)