Hi Arvid, thanks for reply, thread dump + logs research didn’t help. We suggested that problem was in async call to remote key-value storage because we (1) found that async client timeout was set to 0 (effectively no timeout, idle infinitely), (2) async client threads we sleeping, (3) AsyncWaitOperator.Emitter thread was blocked peeking new async result while AsyncWaitOperator.processWatermak was blocked to put new item in a queue. We changed timeout to non zero value and since then (for a week or so) job doesn’t hang. So, I guess the problem was in async client timeout (not in kafka or flink).
Hope this helps someone! > 9 июня 2021 г., в 14:10, Arvid Heise <ar...@apache.org> написал(а): > > Hi Ilya, > > These messages could pop up when a Kafka broker is down but should eventually > disappear. So I'm a bit lost. > > If there was a bug, it's also most likely fixed in the meantime. So if you > want to be on the safe side, I'd try to upgrade to more recent versions > (Flink + Kafka consumer). > > Best, > > Arvid > > On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov <idkf...@gmail.com > <mailto:idkf...@gmail.com>> wrote: > Hi there, > > today I've observed strange behaviour of a flink streaming application (flink > 1.6.1, per-job cluster deployment, yarn): > 3 task managers (2 slots each) are running but only 1 slot is actually > consuming messages from kafka (v0.11.0.2), others were idling > (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics). > > So I started to investigate: > - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all > 6 topic partitions are constantly increasing. > - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That > makes me thinks that by somehow 5 kafka consumers lost connection to brokers. > - A LOT of messages "Committing offsets to Kafka takes longer than the > checkpoint interval. Skipping commit of previous offsets because newer > complete checkpoint offsets are available. This does not compromise Flink's > checkpoint integrity." in each task manager instance. > - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days. > - no application errors/uncaught exceptions etc. > - no reconnections to kafka. > - some network issues connected with hdfs (Slow waitForAckedSeqno). > - all kafka networking setting are default (e.g. timeouts). > > After job restart all task managers started to consume messages (6 slots in > total, and `kafka-consumer-groups.sh` listed that all 6 partitions are > consumed). > > May be someone had already experienced something similar? > > Job topology is as follows (no window operations!): > ``` > val dataStream = env.addSource(kafkaSource).map(processor); > > val terminalStream = AsyncDataStream > .unorderedWait(dataStream, asyncFun, timout, timeoutUnit) > .process(sideOutputFun); > > terminalStream > .keyBy(selector) > .process(keyProcFun) > .addSink(kafkaSink_1); > > terminalStream > .getSideOutput("outputTag") > .addSink(kafkaSink_2); > ```