Hi,

I have the 3 consumers subscribing from a single topic with the following
settings.

auto.commit.interval.ms = 1000
enable.auto.commit      = true
request.timeout.ms      = 60000
session.timeout.ms      = 59000
heartbeat.interval.ms   = 4000

Version 0.9.0.

The consumers looks like this:

while (isRunning()) {
    ConsumerRecords<String, byte[]> records = consumer.poll(3000);

    for (final ConsumerRecord<String, byte[]> record : records) {
        doWork(...);
    }

    if (!records.isEmpty()) {
        log.info(String.format("Committed(Records = '%d', Offset = '%d',
Time = '%dms')", records.count(),
                Iterables.getLast(records).offset(),
                TimeUnit.MILLISECONDS.convert(elapsedTime,
TimeUnit.NANOSECONDS)));
    }
}

My problem is that the offset sometimes is moved back a lot during a
rebalance. This is the log from the consumers during a rebalance.

Consumer 1:
[2016-03-17 04:06:55,823] Committed(Records = '4', Offset = '427432', Time
= '3090ms')
[2016-03-17 04:06:55,823]
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Error
UNKNOWN_MEMBER_ID occurred while committing offsets for group default
[2016-03-17 04:06:55,823]
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Auto
offset commit failed: Commit cannot be completed due to group rebalance
[2016-03-17 04:06:55,824]
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Error
UNKNOWN_MEMBER_ID occurred while committing offsets for group default
[2016-03-17 04:06:55,824]
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Auto
offset commit failed: Commit cannot be completed due to group rebalance
[2016-03-17 04:06:55,825]
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Error
UNKNOWN_MEMBER_ID occurred while committing offsets for group default
[2016-03-17 04:06:55,825]
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator: Auto
offset commit failed:
[2016-03-17 04:06:55,825]
org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Attempt to
join group default failed due to unknown member id, resetting and retrying.
[2016-03-17 04:07:11,463] Committed(Records = '4', Offset = '427412', Time
= '1ms')

Offset before rebalance: 427432
Offset after rebalance: 427412
Diff: 20

Consumer 2:
INFO  [2016-03-17 04:06:52,299] Committed(Records = '1', Offset = '427444',
Time = '0ms')
INFO  [2016-03-17 04:06:52,573]
org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Attempt to
heart beat failed since the group is rebalancing, try to re-join group.
INFO  [2016-03-17 04:07:11,464] Committed(Records = '4', Offset = '427430',
Time = '0ms')

Offset before rebalance: 427444
Offset after rebalance: 427430
Diff: 14

Consumer 3:
[2016-03-17 04:07:07,254] Committed(Records = '4', Offset = '427408', Time
= '1674ms')
[2016-03-17 04:07:07,254]
org.apache.kafka.clients.consumer.internals.AbstractCoordinator: Attempt to
heart beat failed since the group is rebalancing, try to re-join group.
[2016-03-17 04:07:13,811] Committed(Records = '4', Offset = '412777', Time
= '2345ms')

Offset before rebalance: 427410
Offset after rebalance: 412777
Diff: 14 633. This is causing trouble for me.


Any idea on what I'm doing wrong?

/Erik

Reply via email to