Hi,

What’s your KafkaConsumer configuration? Especially values for:
- is checkpointing enabled?
- enable.auto.commit (or auto.commit.enable for Kafka 0.8) / 
auto.commit.interval.ms
- did you set setCommitOffsetsOnCheckpoints() ?

Please also refer to 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/connectors/kafka.html#kafka-consumers-offset-committing-behaviour-configuration>
 , especially this part:

> Note that the Flink Kafka Consumer does not rely on the committed offsets for 
> fault tolerance guarantees. The committed offsets are only a means to expose 
> the consumer’s progress for monitoring purposes.

Can you post full logs from all TaskManagers/JobManager and can you 
say/estimate when did the committing brake/stop? Did you check Kafka logs for 
any errors?

To me it seems more like a Kafka issue/bug:
https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188
 
<https://community.cloudera.com/t5/Data-Ingestion-Integration/Cannot-auto-commit-offsets-for-group-console-consumer-79720/m-p/51188>
https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232
 
<https://stackoverflow.com/questions/42362911/kafka-high-level-consumer-error-code-15/42416232#42416232>
Especially that in your case this offsets committing is superseded by Kafka 
coordinator failure.

Piotrek

> On 8 Jun 2018, at 10:05, Juho Autio <juho.au...@rovio.com> wrote:
> 
> Hi,
> 
> We have a Flink stream job that uses Flink kafka consumer. Normally it 
> commits consumer offsets to Kafka.
> 
> However this stream ended up in a state where it's otherwise working just 
> fine, but it isn't committing offsets to Kafka any more. The job keeps 
> writing correct aggregation results to the sink, though. At the time of 
> writing this, the job has been running 14 hours without committing offsets.
> 
> Below is an extract from taskmanager.log. As you can see, it didn't log 
> anything until ~2018-06-07 22:08. Also that's where the log ends, these are 
> the last lines so far.
> 
> Could you help check if this is a know bug, possibly already fixed, or 
> something new?
> 
> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit 
> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
> 
> Cheers,
> Juho
> 
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser     
>               - Kafka version : 0.10.2.1
> 2018-06-06 10:01:33,498 INFO  org.apache.kafka.common.utils.AppInfoParser     
>               - Kafka commitId : e89bffd6b2eff799
> 2018-06-06 10:01:33,560 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
> <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: 
> 2147483550 rack: null) for group 
> aggregate-all_server_measurements_combined-20180606-1000.
> 2018-06-06 10:01:33,563 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Discovered 
> coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
> <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: 
> 2147483550 rack: null) for group 
> aggregate-all_server_measurements_combined-20180606-1000.
> 2018-06-07 22:08:28,773 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
> <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: 
> 2147483550 rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000
> 2018-06-07 22:08:28,776 WARN  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
> Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550, 
> metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444, metadata=''}, 
> topic3-0=OffsetAndMetadata{offset=5064277287, metadata=''}, 
> topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''}, 
> topic2-1=OffsetAndMetadata{offset=89817267, metadata=''}, 
> topic1-10=OffsetAndMetadata{offset=12299742352, metadata=''}} failed for 
> group aggregate-all_server_measurements_combined-20180606-1000: Offset commit 
> failed with a retriable exception. You should retry committing offsets.
> 2018-06-07 22:08:29,840 INFO  
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking 
> the coordinator my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092 
> <http://my-kafka-host-10-1-16-97.cloud-internal.mycompany.com:9092/> (id: 
> 2147483550 rack: null) dead for group 
> aggregate-all_server_measurements_combined-20180606-1000
> 2018-06-07 22:08:29,841 WARN  
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  - 
> Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875, 
> metadata=''}, topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''}, 
> topic1-14=OffsetAndMetadata{offset=12299972108, metadata=''}} failed for 
> group aggregate-all_server_measurements_combined-20180606-1000: Offset commit 
> failed with a retriable exception. You should retry committing offsets.
> 

Reply via email to