Hi Aljoscha, Thank you for the help and reply.
1. I think we have finally pinpointed what the root cause to this issue is: When partitions are assigned manually (e.g. with assign() API instead subscribe() API) the client will not try to rediscover the coordinator if it dies [1]. This seems to no longer be an issue after Kafka 1.1.0 After cherry-picking the PR[2] back to Kafka 0.11.x branch and package it with our Flink application, we haven't seen this issue re-occurred so far. 2. The GROUP_OFFSETS is in fact the default startup mode if Checkpoint is not enable - that's why I was a bit surprise that this problem was reported so many times. To follow up on the question "whether resuming from GROUP_OFFSETS are useful": there are definitely use cases where users don't want to use checkpointing (e.g. due to resource constraint, storage cost consideration, etc), but somehow still want to avoid a certain amount of data loss. Most of our analytics use cases falls into this category. -- Rong [1] https://issues.apache.org/jira/browse/KAFKA-6362 [2] https://github.com/apache/kafka/pull/4326 On Wed, Mar 11, 2020 at 10:16 AM Aljoscha Krettek <aljos...@apache.org> wrote: > On 09.03.20 06:10, Rong Rong wrote: > > - Is this feature (disabling checkpoint and restarting job from Kafka > > committed GROUP_OFFSET) not supported? > > I believe the Flink community never put much (any?) effort into this > because the Flink Kafka Consumer does its own offset handling. Starting > from the committed offsets should work fine, though, the default startup > mode is even StartupMode.GROUP_OFFSETS. > > > - How does Flink-Kafka actually handles auto-commit to coordinator given > > the fact that Flink ignores the coordinator assignments and uses > > self-assigning partitions instead? > > I think we don't do anything for this case, the Kafka Consumer code will > do the committing if 'enable.auto.commit' is set. I don't know how this > will play with out code because we disable the automatic group handling. > > Do you think letting Kafka do the auto committing is ever useful, if you > have a Flink job that does checkpoints you will get the correct offset > committing and you can start a job from the committed offsets. In what > cases would you want to use the builtin Kafka offset committing? > > Best, > Aljoscha >