[ 
https://issues.apache.org/jira/browse/KAFKA-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gregory Koshelev updated KAFKA-8104:
------------------------------------
    Description: 
TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent 
{{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and 
{{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). See 
explanation below.

There are 16 consumers in single process (threads from pool-4-thread-1 to 
pool-4-thread-16). All of them belong to single consumer group 
{{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired and 
consumers have got {{CommitFailedException}} as expected:

{noformat}
2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN  
r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured max.poll.interval.ms, which typically implies that 
the poll loop is spending too much time message processing. You can address 
this either by increasing the session timeout or by reducing the maximum size 
of batches returned in poll() with max.poll.records.
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298)
        at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156)
        at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{noformat}

After that, most of them successfully rejoined to the group with generation 
10699:
{noformat}
2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO  
o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
with generation 10699
2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
partitions [legacy_logs_elk_c2-18]
...
2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO  
o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
with generation 10699
2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11]
...
2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
partitions [legacy_logs_elk_c2-24]
2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator 
- [Consumer clientId=consumer-6, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
since group is rebalancing
2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator 
- [Consumer clientId=consumer-5, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
since group is rebalancing
2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | 
hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator 
- [Consumer clientId=consumer-7, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
since group is rebalancing
2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO  
o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
with generation -1

{noformat}


But one consumer (pool-4-thread-4) got strange generation -1 (see last log 
record from above).
Further log records in attached log file.

Finally, 15 consumers successfully rejoined. But consumer with thread 
{{pool-4-thread-4}} didn't rejoin:

{noformat}
2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR 
r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
java.lang.IllegalStateException: Coordinator selected invalid assignment 
protocol: null
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
        at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
        at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR 
r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
java.lang.IllegalStateException: Coordinator selected invalid assignment 
protocol: null
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
        at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
        at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)}}
{noformat}

It is important to note, that {{KafkaConsumer.coordinator.joinFuture}} is not 
null and succeeded, but {{ConsumerCoordinator}} cannot perform 
{{resetJoinGroupFuture()}} due to exception was thrown from 
{{onJoinComplete()}}:
{code:java}
            if (future.succeeded()) {
                // Duplicate the buffer in case `onJoinComplete` does not 
complete and needs to be retried.
                ByteBuffer memberAssignment = future.value().duplicate();
                onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, memberAssignment);

                // We reset the join group future only after the completion 
callback returns. This ensures
                // that if the callback is woken up, we will retry it on the 
next joinGroupIfNeeded.
                resetJoinGroupFuture();
                needsJoinPrepare = true;
            }
{code}

# If I understood correctly, the generation was changed to {{NO_GENERATION}} in 
another thread by one of CoordinatorResponseHandlers.
 
 [^consumer-rejoin-fail.log] 

  was:
TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent 
{{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and 
{{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). See 
explanation below.

There are 16 consumers in single process (threads from pool-4-thread-1 to 
pool-4-thread-16). All of them belong to single consumer group 
{{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired and 
consumers have got {{CommitFailedException}} as expected:

{noformat}
2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN  
r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
completed since the group has already rebalanced and assigned the partitions to 
another member. This means that the time between subsequent calls to poll() was 
longer than the configured max.poll.interval.ms, which typically implies that 
the poll loop is spending too much time message processing. You can address 
this either by increasing the session timeout or by reducing the maximum size 
of batches returned in poll() with max.poll.records.
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298)
        at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156)
        at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
{noformat}

After that, most of them successfully rejoined to the group with generation 
10699:
{noformat}
2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO  
o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
with generation 10699
2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
partitions [legacy_logs_elk_c2-18]
...
2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO  
o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
with generation 10699
2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11]
...
2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO  
o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
partitions [legacy_logs_elk_c2-24]
2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator 
- [Consumer clientId=consumer-6, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
since group is rebalancing
2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator 
- [Consumer clientId=consumer-5, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
since group is rebalancing
2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | 
hercules.sink.elastic.legacy_logs_elk_c2] INFO  o.a.k.c.c.i.AbstractCoordinator 
- [Consumer clientId=consumer-7, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
since group is rebalancing
2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO  
o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, 
groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
with generation -1

{noformat}


But one consumer (pool-4-thread-4) got strange generation -1 (see last log 
record from above).
Further log records in attached log file.

Finally, 15 consumers successfully rejoined. But consumer with thread 
{{pool-4-thread-4}} didn't rejoin:

{noformat}
2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR 
r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
java.lang.IllegalStateException: Coordinator selected invalid assignment 
protocol: null
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
        at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
        at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR 
r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
java.lang.IllegalStateException: Coordinator selected invalid assignment 
protocol: null
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
        at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
        at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
        at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
        at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
        at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)}}
{noformat}

It is important to note, that {{KafkaConsumer.coordinator.joinFuture}} is not 
null and succeeded, but {{ConsumerCoordinator}} cannot perform 
{{resetJoinGroupFuture()}} due to exception was thrown from 
{{onJoinComplete()}}:
{code:java}
            if (future.succeeded()) {
                // Duplicate the buffer in case `onJoinComplete` does not 
complete and needs to be retried.
                ByteBuffer memberAssignment = future.value().duplicate();
                onJoinComplete(generation.generationId, generation.memberId, 
generation.protocol, memberAssignment);

                // We reset the join group future only after the completion 
callback returns. This ensures
                // that if the callback is woken up, we will retry it on the 
next joinGroupIfNeeded.
                resetJoinGroupFuture();
                needsJoinPrepare = true;
            }
{code}
 
 [^consumer-rejoin-fail.log] 


> Consumer cannot rejoin to the group after rebalancing
> -----------------------------------------------------
>
>                 Key: KAFKA-8104
>                 URL: https://issues.apache.org/jira/browse/KAFKA-8104
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>    Affects Versions: 2.0.0
>            Reporter: Gregory Koshelev
>            Priority: Major
>         Attachments: consumer-rejoin-fail.log
>
>
> TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent 
> {{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and 
> {{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). 
> See explanation below.
> There are 16 consumers in single process (threads from pool-4-thread-1 to 
> pool-4-thread-16). All of them belong to single consumer group 
> {{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired 
> and consumers have got {{CommitFailedException}} as expected:
> {noformat}
> 2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN  
> r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing
> org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be 
> completed since the group has already rebalanced and assigned the partitions 
> to another member. This means that the time between subsequent calls to 
> poll() was longer than the configured max.poll.interval.ms, which typically 
> implies that the poll loop is spending too much time message processing. You 
> can address this either by increasing the session timeout or by reducing the 
> maximum size of batches returned in poll() with max.poll.records.
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298)
>       at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156)
>       at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {noformat}
> After that, most of them successfully rejoined to the group with generation 
> 10699:
> {noformat}
> 2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
> with generation 10699
> 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-18]
> ...
> 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
> with generation 10699
> 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11]
> ...
> 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO  
> o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned 
> partitions [legacy_logs_elk_c2-24]
> 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
> since group is rebalancing
> 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
> since group is rebalancing
> 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | 
> hercules.sink.elastic.legacy_logs_elk_c2] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-7, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed 
> since group is rebalancing
> 2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO  
> o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, 
> groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group 
> with generation -1
> {noformat}
> But one consumer (pool-4-thread-4) got strange generation -1 (see last log 
> record from above).
> Further log records in attached log file.
> Finally, 15 consumers successfully rejoined. But consumer with thread 
> {{pool-4-thread-4}} didn't rejoin:
> {noformat}
> 2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR 
> r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
> java.lang.IllegalStateException: Coordinator selected invalid assignment 
> protocol: null
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
>       at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
>       at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> 2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR 
> r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired
> java.lang.IllegalStateException: Coordinator selected invalid assignment 
> protocol: null
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352)
>       at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337)
>       at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175)
>       at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154)
>       at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152)
>       at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)}}
> {noformat}
> It is important to note, that {{KafkaConsumer.coordinator.joinFuture}} is not 
> null and succeeded, but {{ConsumerCoordinator}} cannot perform 
> {{resetJoinGroupFuture()}} due to exception was thrown from 
> {{onJoinComplete()}}:
> {code:java}
>             if (future.succeeded()) {
>                 // Duplicate the buffer in case `onJoinComplete` does not 
> complete and needs to be retried.
>                 ByteBuffer memberAssignment = future.value().duplicate();
>                 onJoinComplete(generation.generationId, generation.memberId, 
> generation.protocol, memberAssignment);
>                 // We reset the join group future only after the completion 
> callback returns. This ensures
>                 // that if the callback is woken up, we will retry it on the 
> next joinGroupIfNeeded.
>                 resetJoinGroupFuture();
>                 needsJoinPrepare = true;
>             }
> {code}
> # If I understood correctly, the generation was changed to {{NO_GENERATION}} 
> in another thread by one of CoordinatorResponseHandlers.
>  
>  [^consumer-rejoin-fail.log] 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to