Hi Arvid,
thanks for reply,

thread dump + logs research didn’t help. We suggested that problem was in async 
call to remote key-value storage because we (1) found that async client timeout 
was set to 0 (effectively no timeout, idle infinitely), (2) async client 
threads we sleeping, (3) AsyncWaitOperator.Emitter thread was blocked peeking 
new async result while AsyncWaitOperator.processWatermak was blocked to put new 
item in a queue. We changed timeout to non zero value and since then (for a 
week or so) job doesn’t hang. So, I guess the problem was in async client 
timeout (not in kafka or flink).

Hope this helps someone!

> 9 июня 2021 г., в 14:10, Arvid Heise <ar...@apache.org> написал(а):
> 
> Hi Ilya,
> 
> These messages could pop up when a Kafka broker is down but should eventually 
> disappear. So I'm a bit lost. 
> 
> If there was a bug, it's also most likely fixed in the meantime. So if you 
> want to be on the safe side, I'd try to upgrade to more recent versions 
> (Flink + Kafka consumer).
> 
> Best,
> 
> Arvid
> 
> On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov <idkf...@gmail.com 
> <mailto:idkf...@gmail.com>> wrote:
> Hi there,
> 
> today I've observed strange behaviour of a flink streaming application (flink 
> 1.6.1, per-job cluster deployment, yarn):
> 3 task managers (2 slots each) are running but only 1 slot is actually 
> consuming messages from kafka (v0.11.0.2), others were idling 
> (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics). 
> 
> So I started to investigate: 
> - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 
> 6 topic partitions are constantly increasing.
> - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That 
> makes me thinks that by somehow 5 kafka consumers lost connection to brokers.
> - A LOT of messages "Committing offsets to Kafka takes longer than the 
> checkpoint interval. Skipping commit of previous offsets because newer 
> complete checkpoint offsets are available. This does not compromise Flink's 
> checkpoint integrity." in each task manager instance.
> - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
> - no application errors/uncaught exceptions etc.
> - no reconnections to kafka.
> - some network issues connected with hdfs (Slow waitForAckedSeqno).
> - all kafka networking setting are default (e.g. timeouts).
> 
> After job restart all task managers started to consume messages (6 slots in 
> total, and `kafka-consumer-groups.sh` listed that all 6 partitions are 
> consumed).
> 
> May be someone had already experienced something similar?
> 
> Job topology is as follows (no window operations!):
> ```
> val dataStream = env.addSource(kafkaSource).map(processor);
> 
> val terminalStream = AsyncDataStream
>     .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
>     .process(sideOutputFun);
> 
> terminalStream
>     .keyBy(selector)
>     .process(keyProcFun)
>     .addSink(kafkaSink_1);
> 
> terminalStream
>     .getSideOutput("outputTag")
>     .addSink(kafkaSink_2);
> ```

Reply via email to