Hi Ilya, These messages could pop up when a Kafka broker is down but should eventually disappear. So I'm a bit lost.
If there was a bug, it's also most likely fixed in the meantime. So if you want to be on the safe side, I'd try to upgrade to more recent versions (Flink + Kafka consumer). Best, Arvid On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov <idkf...@gmail.com> wrote: > Hi there, > > today I've observed strange behaviour of a flink streaming application > (flink 1.6.1, per-job cluster deployment, yarn): > 3 task managers (2 slots each) are running but only 1 slot is actually > consuming messages from kafka (v0.11.0.2), others were idling > (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics). > > So I started to investigate: > - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for > all 6 topic partitions are constantly increasing. > - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That > makes me thinks that by somehow 5 kafka consumers lost connection to > brokers. > - A LOT of messages "Committing offsets to Kafka takes longer than the > checkpoint interval. Skipping commit of previous offsets because newer > complete checkpoint offsets are available. This does not compromise Flink's > checkpoint integrity." in each task manager instance. > - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days. > - no application errors/uncaught exceptions etc. > - no reconnections to kafka. > - some network issues connected with hdfs (Slow waitForAckedSeqno). > - all kafka networking setting are default (e.g. timeouts). > > After job restart all task managers started to consume messages (6 slots > in total, and `kafka-consumer-groups.sh` listed that all 6 partitions are > consumed). > > May be someone had already experienced something similar? > > Job topology is as follows (no window operations!): > ``` > val dataStream = env.addSource(kafkaSource).map(processor); > > val terminalStream = AsyncDataStream > .unorderedWait(dataStream, asyncFun, timout, timeoutUnit) > .process(sideOutputFun); > > terminalStream > .keyBy(selector) > .process(keyProcFun) > .addSink(kafkaSink_1); > > terminalStream > .getSideOutput("outputTag") > .addSink(kafkaSink_2); > ```