Hi there,

today I've observed strange behaviour of a flink streaming application (flink 
1.6.1, per-job cluster deployment, yarn):
3 task managers (2 slots each) are running but only 1 slot is actually 
consuming messages from kafka (v0.11.0.2), others were idling 
(currentOutputWatermark was stuck, and 0 numRecordsOutPerSecond metrics). 

So I started to investigate: 
- `kafka-run-class.sh kafka.tools.GetOffsetShell` showed that offsets for all 6 
topic partitions are constantly increasing.
- `kafka-consumer-groups.sh` listed only single (the 4th) partition. That makes 
me thinks that by somehow 5 kafka consumers lost connection to brokers.
- A LOT of messages "Committing offsets to Kafka takes longer than the 
checkpoint interval. Skipping commit of previous offsets because newer complete 
checkpoint offsets are available. This does not compromise Flink's checkpoint 
integrity." in each task manager instance.
- 5/6 slot didn’t advanced currentOutputWatermark for about 3 days.
- no application errors/uncaught exceptions etc.
- no reconnections to kafka.
- some network issues connected with hdfs (Slow waitForAckedSeqno).
- all kafka networking setting are default (e.g. timeouts).

After job restart all task managers started to consume messages (6 slots in 
total, and `kafka-consumer-groups.sh` listed that all 6 partitions are 
consumed).

May be someone had already experienced something similar?

Job topology is as follows (no window operations!):
```
val dataStream = env.addSource(kafkaSource).map(processor);

val terminalStream = AsyncDataStream
    .unorderedWait(dataStream, asyncFun, timout, timeoutUnit)
    .process(sideOutputFun);

terminalStream
    .keyBy(selector)
    .process(keyProcFun)
    .addSink(kafkaSink_1);

terminalStream
    .getSideOutput("outputTag")
    .addSink(kafkaSink_2);
```

Reply via email to