Hi Ilya,

These messages could pop up when a Kafka broker is down but should
eventually disappear. So I'm a bit lost.

If there was a bug, it's also most likely fixed in the meantime. So if you
want to be on the safe side, I'd try to upgrade to more recent versions
(Flink + Kafka consumer).

Best,

Arvid

On Wed, Jun 2, 2021 at 7:01 PM Ilya Karpov <idkf...@gmail.com> wrote:

> Hi there,
>
> today I've observed strange behaviour of a flink streaming application
> (flink 1.6.1, per-job cluster deployment, yarn):
> 3 task managers (2 slots each) are running but only 1 slot is actually
> consuming messages from kafka (v0.11.0.2), others were idling
> (currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics).
>
> So I started to investigate:
> - `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for
> all 6 topic partitions are constantly increasing.
> - `kafka-consumer-groups.sh` listed only single (the 4th) partition. That
> makes me thinks that by somehow 5 kafka consumers lost connection to
> brokers.
> - A LOT of messages "Committing offsets to Kafka takes longer than the
> checkpoint interval. Skipping commit of previous offsets because newer
> complete checkpoint offsets are available. This does not compromise Flink's
> checkpoint integrity." in each task manager instance.
> - 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
> - no application errors/uncaught exceptions etc.
> - no reconnections to kafka.
> - some network issues connected with hdfs (Slow waitForAckedSeqno).
> - all kafka networking setting are default (e.g. timeouts).
>
> After job restart all task managers started to consume messages (6 slots
> in total, and `kafka-consumer-groups.sh` listed that all 6 partitions are
> consumed).
>
> May be someone had already experienced something similar?
>
> Job topology is as follows (no window operations!):
> ```
> val dataStream = env.addSource(kafkaSource).map(processor);
>
> val terminalStream = AsyncDataStream
>     .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
>     .process(sideOutputFun);
>
> terminalStream
>     .keyBy(selector)
>     .process(keyProcFun)
>     .addSink(kafkaSink_1);
>
> terminalStream
>     .getSideOutput("outputTag")
>     .addSink(kafkaSink_2);
> ```

Reply via email to