Hi, What’s your KafkaConsumer configuration? Especially values for: - is checkpointing enabled? - enable.auto.commit (or auto.commit.enable for Kafka 0.8) / auto.commit.interval.ms - did you set setCommitOffsetsOnCheckpoints() ?
Please also refer to https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration <https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration> , especially this part: > Note that the Flink Kafka Consumer does not rely on the committed offsets for > fault tolerance guarantees. The committed offsets are only a means to expose > the consumer’s progress for monitoring purposes. Can you post full logs from all TaskManagers/JobManager and can you say/estimate when did the committing brake/stop? Did you check Kafka logs for any errors? To me it seems more like a Kafka issue/bug: https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188 <https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188> https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232 <https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232> Especially that in your case this offsets committing is superseded by Kafka coordinator failure. Piotrek > On 8 Jun 2018, at 10:05, Juho Autio <juho.au...@rovio.com> wrote: > > Hi, > > We have a Flink stream job that uses Flink kafka consumer. Normally it > commits consumer offsets to Kafka. > > However this stream ended up in a state where it's otherwise working just > fine, but it isn't committing offsets to Kafka any more. The job keeps > writing correct aggregation results to the sink, though. At the time of > writing this, the job has been running 14 hours without committing offsets. > > Below is an extract from taskmanager.log. As you can see, it didn't log > anything until ~2018-06-07 22:08. Also that's where the log ends, these are > the last lines so far. > > Could you help check if this is a know bug, possibly already fixed, or > something new? > > I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit > 8395508b0401353ed07375e22882e7581d46ac0e which is not super old. > > Cheers, > Juho > > 2018-06-06 10:01:33,498 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka version : 0.10.2.1 > 2018-06-06 10:01:33,498 INFO org.apache.kafka.common.utils.AppInfoParser > - Kafka commitId : e89bffd6b2eff799 > 2018-06-06 10:01:33,560 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 > <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: > 2147483550 rack: null) for group > aggregate-all_server_measurements_combined-20180606-1000. > 2018-06-06 10:01:33,563 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Discovered > coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 > <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: > 2147483550 rack: null) for group > aggregate-all_server_measurements_combined-20180606-1000. > 2018-06-07 22:08:28,773 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 > <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: > 2147483550 rack: null) dead for group > aggregate-all_server_measurements_combined-20180606-1000 > 2018-06-07 22:08:28,776 WARN > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - > Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, > metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, > topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, > topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, > topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, > topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for > group aggregate-all_server_measurements_combined-20180606-1000: Offset commit > failed with a retriable exception. You should retry committing offsets. > 2018-06-07 22:08:29,840 INFO > org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Marking > the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 > <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: > 2147483550 rack: null) dead for group > aggregate-all_server_measurements_combined-20180606-1000 > 2018-06-07 22:08:29,841 WARN > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - > Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875, > metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, > topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for > group aggregate-all_server_measurements_combined-20180606-1000: Offset commit > failed with a retriable exception. You should retry committing offsets. >