[ https://issues.apache.org/jira/browse/KAFKA-8104?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16946114#comment-16946114 ]
ASF GitHub Bot commented on KAFKA-8104: --------------------------------------- nizhikov commented on pull request #7460: KAFKA-8104: Consumer cannot rejoin to the group after rebalancing URL: https://github.com/apache/kafka/pull/7460 This PR contains the fix of race condition bug between "consumer thread" and "consumer coordinator heartbeat thread". It reproduces in many production environments. Condition for reproducing: 1. Consumer thread initiates rejoin to the group because of commit timeout. Call of `AbstractCoordinator#joinGroupIfNeeded` which leads to `sendJoinGroupRequest`. 2. `JoinGroupResponseHandler` writes to the `AbstractCoordinator.this.generation` new generation data and leaves the` synchronized` section. 3. Heartbeat thread executes `mabeLeaveGroup` and clears generation data via `resetGenerationOnLeaveGroup`. 4. Consumer thread executes `onJoinComplete(generation.generationId, generation.memberId, generation.protocol, memberAssignment);` with the cleared generation data. This leads to the corresponding exception. The race fixed with the condition in `maybeLeaveGroup`: if we have ongoing rejoin process in consumer thread there is no reason to reset generation data and send `LeaveGroupRequest` in heartbeat thread. This PR contains unfair "reproducer". It implemented with the `CountDownLatch` that imitates described race in `AbstractCoordinator` code. I need assistance on how should be fair reproducer implemented. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Consumer cannot rejoin to the group after rebalancing > ----------------------------------------------------- > > Key: KAFKA-8104 > URL: https://issues.apache.org/jira/browse/KAFKA-8104 > Project: Kafka > Issue Type: Bug > Components: consumer > Affects Versions: 2.0.0, 2.1.0, 2.2.0, 2.3.0 > Reporter: Gregory Koshelev > Assignee: Nikolay Izhikov > Priority: Critical > Attachments: consumer-rejoin-fail.log > > > TL;DR; {{KafkaConsumer}} cannot rejoin to the group due to inconsistent > {{AbstractCoordinator.generation}} (which is {{NO_GENERATION}} and > {{AbstractCoordinator.joinFuture}} (which is succeeded {{RequestFuture}}). > See explanation below. > There are 16 consumers in single process (threads from pool-4-thread-1 to > pool-4-thread-16). All of them belong to single consumer group > {{hercules.sink.elastic.legacy_logs_elk_c2}}. Rebalancing has been acquired > and consumers have got {{CommitFailedException}} as expected: > {noformat} > 2019-03-10T03:16:37.023Z [pool-4-thread-10] WARN > r.k.vostok.hercules.sink.SimpleSink - Commit failed due to rebalancing > org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be > completed since the group has already rebalanced and assigned the partitions > to another member. This means that the time between subsequent calls to > poll() was longer than the configured max.poll.interval.ms, which typically > implies that the poll loop is spending too much time message processing. You > can address this either by increasing the session timeout or by reducing the > maximum size of batches returned in poll() with max.poll.records. > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:798) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:681) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1334) > at > org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1298) > at ru.kontur.vostok.hercules.sink.Sink.commit(Sink.java:156) > at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:104) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {noformat} > After that, most of them successfully rejoined to the group with generation > 10699: > {noformat} > 2019-03-10T03:16:39.208Z [pool-4-thread-13] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-13, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation 10699 > 2019-03-10T03:16:39.209Z [pool-4-thread-13] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-13, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-18] > ... > 2019-03-10T03:16:39.216Z [pool-4-thread-11] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-11, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation 10699 > 2019-03-10T03:16:39.217Z [pool-4-thread-11] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-11, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-10, legacy_logs_elk_c2-11] > ... > 2019-03-10T03:16:39.218Z [pool-4-thread-15] INFO > o.a.k.c.c.i.ConsumerCoordinator - [Consumer clientId=consumer-15, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Setting newly assigned > partitions [legacy_logs_elk_c2-24] > 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-6, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:16:42.320Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-5, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:16:42.323Z [kafka-coordinator-heartbeat-thread | > hercules.sink.elastic.legacy_logs_elk_c2] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-7, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Attempt to heartbeat failed > since group is rebalancing > 2019-03-10T03:17:13.235Z [pool-4-thread-4] INFO > o.a.k.c.c.i.AbstractCoordinator - [Consumer clientId=consumer-4, > groupId=hercules.sink.elastic.legacy_logs_elk_c2] Successfully joined group > with generation -1 > {noformat} > But one consumer (pool-4-thread-4) got strange generation -1 (see last log > record from above). > Further log records in attached log file. > Finally, 15 consumers successfully rejoined. But consumer with thread > {{pool-4-thread-4}} didn't rejoin: > {noformat} > 2019-03-10T03:17:13.355Z [pool-4-thread-4] ERROR > r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) > at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152) > at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 2019-03-10T03:17:13.360Z [pool-4-thread-4] ERROR > r.k.vostok.hercules.sink.SimpleSink - Unspecified exception has been acquired > java.lang.IllegalStateException: Coordinator selected invalid assignment > protocol: null > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:241) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:422) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:337) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:333) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1175) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1154) > at ru.kontur.vostok.hercules.sink.Sink.poll(Sink.java:152) > at ru.kontur.vostok.hercules.sink.SimpleSink.run(SimpleSink.java:70) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748)}} > {noformat} > It is important to note, that {{KafkaConsumer.coordinator.joinFuture}} is not > null and succeeded, but {{ConsumerCoordinator}} cannot perform > {{resetJoinGroupFuture()}} due to exception was thrown from > {{onJoinComplete()}}: > {code:java} > if (future.succeeded()) { > // Duplicate the buffer in case `onJoinComplete` does not > complete and needs to be retried. > ByteBuffer memberAssignment = future.value().duplicate(); > onJoinComplete(generation.generationId, generation.memberId, > generation.protocol, memberAssignment); > // We reset the join group future only after the completion > callback returns. This ensures > // that if the callback is woken up, we will retry it on the > next joinGroupIfNeeded. > resetJoinGroupFuture(); > needsJoinPrepare = true; > } > {code} > If I understood correctly, the generation was changed to {{NO_GENERATION}} in > another thread by one of CoordinatorResponseHandlers. > > [^consumer-rejoin-fail.log] -- This message was sent by Atlassian Jira (v8.3.4#803005)