Hi there, today I've observed strange behaviour of a flink streaming application (flink 1.6.1, per-job cluster deployment, yarn): 3 task managers (2 slots each) are running but only 1 slot is actually consuming messages from kafka (v0.11.0.2), others were idling (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics).
So I started to investigate: - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 6 topic partitions are constantly increasing. - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That makes me thinks that by somehow 5 kafka consumers lost connection to brokers. - A LOT of messages "Committing offsets to Kafka takes longer than the checkpoint interval. Skipping commit of previous offsets because newer complete checkpoint offsets are available. This does not compromise Flink's checkpoint integrity." in each task manager instance. - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days. - no application errors/uncaught exceptions etc. - no reconnections to kafka. - some network issues connected with hdfs (Slow waitForAckedSeqno). - all kafka networking setting are default (e.g. timeouts). After job restart all task managers started to consume messages (6 slots in total, and `kafka-consumer-groups.sh` listed that all 6 partitions are consumed). May be someone had already experienced something similar? Job topology is as follows (no window operations!): ``` val dataStream = env.addSource(kafkaSource).map(processor); val terminalStream = AsyncDataStream .unorderedWait(dataStream, asyncFun, timout, timeoutUnit) .process(sideOutputFun); terminalStream .keyBy(selector) .process(keyProcFun) .addSink(kafkaSink_1); terminalStream .getSideOutput("outputTag") .addSink(kafkaSink_2); ```