Hi Edward,
looking through the Kafka code, I do see a path where they deliberately
do not want recursive retries, i.e. if the coordinator is unknown. It
seems like you are getting into this scenario.

I'm no expert on Kafka and therefore I'm not sure on the implications or
ways to circumvent/fix this, maybe the Kafka folks can help you with
this on their mailing list or Gordon (cc'd) knows - although this seems
Flink-unrelated.

Regarding the use of OffsetCommitMode.ON_CHECKPOINTS: I looked at our
code and with this (@Gordon, please correct me if I'm wrong), we will
commit the offsets ourselves and will try to commit every time a
checkpoint completes. In case of a failure in the last commit, we will
simply commit the new one instead with the next checkpoint.


Nico

On 05/03/18 17:11, Edward wrote:
> We have noticed that the Kafka offset auto-commit functionality seems to stop
> working after it encounters a timeout. It appears in the logs like this:
> 
> 2018-03-04 07:02:54,779 INFO 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
> the coordinator kafka06:9092 (id: 2147483641 rack: null) dead for group
> consumergroup01
> 2018-03-04 07:02:54,780 WARN 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
> Auto-commit of offsets {topic01-24=OffsetAndMetadata{offset=153237895,
> metadata=''}} failed for group consumergroup01: Offset commit failed with a
> retriable exception. You should retry committing offsets. The underlying
> error was: The request timed out.
> 
> After this message is logged, no more offsets are committed by the job until
> it is restarted (and if the flink process ends abnormally, the offsets never
> get committed).
> 
> This is using Flink 1.4.0 which uses kafka-clients 0.11.0.2. We are using
> the default kafka client settings for enable.auto.commit (true) and
> auto.commit.interval.ms (5000). We are not using Flink checkpointing, so the
> kafka client offset commit mode is OffsetCommitMode.KAFKA_PERIODIC (not
> OffsetCommitMode.ON_CHECKPOINTS).
> 
> I'm wondering if others have encountered this?
> 
> And if so, does enabling checkpointing resolve the issue, because
> Kafka09Fetcher.doCommitInternalOffsetsToKafka is called from the Flink code?
> 
> 
> 
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to