Probably your kafka consumer is rebalancing.  This can be due to a bigger
message processing time due to which kafka broker is marking your consumer
dead and rebalancing. This all happens before the consumer can commit the

On Mon, Jun 11, 2018 at 7:37 PM Piotr Nowojski <>

> The more I look into it, the more it seems like a Kafka bug or some
> cluster failure from which your Kafka cluster did not recover.
> In your cases auto committing should be set to true and in that case
> KafkaConsumer should commit offsets once every so often when it’s polling
> messages. Unless for example `cordinatorUnknown()` returns false in
> `org.apache.kafka.clients.consumer.internals.ConsumerCoordinator#maybeAutoCommitOffsetsAsync`
> (Kafka code base):
> private void maybeAutoCommitOffsetsAsync(long now) {
>     if (autoCommitEnabled) {
>         if (coordinatorUnknown()) {
>             this.nextAutoCommitDeadline = now + retryBackoffMs;
>         } else if (now >= nextAutoCommitDeadline) {
>             this.nextAutoCommitDeadline = now + autoCommitIntervalMs;
>             doAutoCommitOffsetsAsync();
>         }
>     }
> }
> Have you checked Kafka logs? This suggests that the real problem is hidden
> behind:
> >  INFO  org.apache.kafka.clients.consumer.internals.AbstractCoordinator
>  - Marking the coordinator my-kafka-host-10-1-16-
> (id: 2147483550 <(214)%20748-3550>
> rack: null) dead for group
> aggregate-all_server_measurements_combined-20180606-1000
> And maybe your Kafka cluster/consumer can not recover from this situation.
> Another thing to try (simpler) is to just trying upgrading Kafka cluster.
> Piotrek
> On 11 Jun 2018, at 11:44, Juho Autio <> wrote:
> Hi Piotr, thanks for your insights.
> > What’s your KafkaConsumer configuration?
> We only set these in the properties that are passed to
> FlinkKafkaConsumer010 constructor:
> auto.offset.reset=latest
> bootstrap.servers=my-kafka-host:9092
> flink.partition-discovery.interval-millis=30000
> > is checkpointing enabled?
> No.
> > (or auto.commit.enable for Kafka 0.8) /
> We have whatever is the default behaviour of Flink kafka consumer. It
> seems to commit quite often, something like every 5 seconds.
> > did you set setCommitOffsetsOnCheckpoints() ?
> No. But I checked with debugger that
> apparently enableCommitOnCheckpoints=true is the default.
> I also checked with debugger that offsetCommitMode=KAFKA_PERIODIC.
> So I guess you're right that this bug doesn't seem to be in Flink itself?
> I wonder if it's a known issue in Kafka client lib..
> I also took thread dump on one of the task managers in this broken state.
> But I couldn't spot anything obvious when comparing the threads to a dump
> from a job where offsets are being committed. Any way I've saved the thread
> dump in case there's something to look for specifically.
> Sharing the full logs of job & task managers would be a bit of a hassle,
> because I don't have an automatic way to obfuscate the logs so that I'm
> sure that there isn't anything sensitive left. Any way, there isn't
> anything else to share really. I wrote: "As you can see, it didn't log
> anything until ~2018-06-07 22:08. Also that's where the log ends".
> Thanks once more.
> On Mon, Jun 11, 2018 at 11:18 AM, Piotr Nowojski <>
> wrote:
>> Hi,
>> What’s your KafkaConsumer configuration? Especially values for:
>> - is checkpointing enabled?
>> - (or auto.commit.enable for Kafka 0.8) /
>> - did you set setCommitOffsetsOnCheckpoints() ?
>> Please also refer to
>>  ,
>> especially this part:
>> > Note that the Flink Kafka Consumer does not rely on the committed
>> offsets for fault tolerance guarantees. The committed offsets are only a
>> means to expose the consumer’s progress for monitoring purposes.
>> Can you post full logs from all TaskManagers/JobManager and can you
>> say/estimate when did the committing brake/stop? Did you check Kafka logs
>> for any errors?
>> To me it seems more like a Kafka issue/bug:
>> Especially that in your case this offsets committing is superseded by
>> Kafka coordinator failure.
>> Piotrek
>> On 8 Jun 2018, at 10:05, Juho Autio <> wrote:
>> Hi,
>> We have a Flink stream job that uses Flink kafka consumer. Normally it
>> commits consumer offsets to Kafka.
>> However this stream ended up in a state where it's otherwise working just
>> fine, but it isn't committing offsets to Kafka any more. The job keeps
>> writing correct aggregation results to the sink, though. At the time of
>> writing this, the job has been running 14 hours without committing offsets.
>> Below is an extract from taskmanager.log. As you can see, it didn't log
>> anything until ~2018-06-07 22:08. Also that's where the log ends, these are
>> the last lines so far.
>> Could you help check if this is a know bug, possibly already fixed, or
>> something new?
>> I'm using a self-built Flink package 1.5-SNAPSHOT, flink commit
>> 8395508b0401353ed07375e22882e7581d46ac0e which is not super old.
>> Cheers,
>> Juho
>> 2018-06-06 10:01:33,498 INFO
>> org.apache.kafka.common.utils.AppInfoParser                   - Kafka
>> version :
>> 2018-06-06 10:01:33,498 INFO
>> org.apache.kafka.common.utils.AppInfoParser                   - Kafka
>> commitId : e89bffd6b2eff799
>> 2018-06-06 10:01:33,560 INFO
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
>> Discovered coordinator
>> (id:
>> 2147483550 <(214)%20748-3550> rack: null) for group
>> aggregate-all_server_measurements_combined-20180606-1000.
>> 2018-06-06 10:01:33,563 INFO
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  -
>> Discovered coordinator
>> (id:
>> 2147483550 <(214)%20748-3550> rack: null) for group
>> aggregate-all_server_measurements_combined-20180606-1000.
>> 2018-06-07 22:08:28,773 INFO
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
>> the coordinator
>> (id:
>> 2147483550 <(214)%20748-3550> rack: null) dead for group
>> aggregate-all_server_measurements_combined-20180606-1000
>> 2018-06-07 22:08:28,776 WARN
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
>> Auto-commit of offsets {topic1-2=OffsetAndMetadata{offset=12300395550,
>> metadata=''}, topic1-18=OffsetAndMetadata{offset=12299210444
>> <(229)%20921-0444>, metadata=''}, topic3-0=OffsetAndMetadata{offset=
>> 5064277287 <(506)%20427-7287>, metadata=''},
>> topic4-6=OffsetAndMetadata{offset=5492398559, metadata=''},
>> topic2-1=OffsetAndMetadata{offset=89817267, metadata=''},
>> topic1-10=OffsetAndMetadata{offset=12299742352 <(229)%20974-2352>,
>> metadata=''}} failed for group
>> aggregate-all_server_measurements_combined-20180606-1000: Offset commit
>> failed with a retriable exception. You should retry committing offsets.
>> 2018-06-07 22:08:29,840 INFO
>> org.apache.kafka.clients.consumer.internals.AbstractCoordinator  - Marking
>> the coordinator
>> (id:
>> 2147483550 <(214)%20748-3550> rack: null) dead for group
>> aggregate-all_server_measurements_combined-20180606-1000
>> 2018-06-07 22:08:29,841 WARN
>> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator  -
>> Auto-commit of offsets {topic1-6=OffsetAndMetadata{offset=12298347875
>> <(229)%20834-7875>, metadata=''},
>> topic4-2=OffsetAndMetadata{offset=5492779112, metadata=''},
>> topic1-14=OffsetAndMetadata{offset=12299972108 <(229)%20997-2108>,
>> metadata=''}} failed for group
>> aggregate-all_server_measurements_combined-20180606-1000: Offset commit
>> failed with a retriable exception. You should retry committing offsets.

Reply via email to