[ 
https://issues.apache.org/jira/browse/KAFKA-12984?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17433653#comment-17433653
 ] 

Andrei D edited comment on KAFKA-12984 at 10/25/21, 12:31 PM:
--------------------------------------------------------------

The entire group used 2.8.1 Kafka-client  and 'CooperativeStickyAssignor'.


Here are broker logs: we see that broker skipped assignment for generations 
10-12 since ConsumerCoordinator was stucked on it's side

 

!image-2021-10-25-11-53-40-221.png!

and here are logs from consumers for the same timeframe:


{code:java}
2021-10-20 10:14:27.878 ERROR {spanId=, traceId=} 
[org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(smallrye-kafka-consumer-thread-0) [Consumer 
clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With 
the COOPERATIVE protocol, owned partitions cannot be reassigned to other 
members; however the assignor has reassigned partitions [qa-qa-cf-events-32, 
qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] 
which are still owned by some members
2021-10-20 10:14:30.566 ERROR {spanId=, traceId=} 
[org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(smallrye-kafka-consumer-thread-0) [Consumer 
clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With 
the COOPERATIVE protocol, owned partitions cannot be reassigned to other 
members; however the assignor has reassigned partitions [qa-qa-cf-events-32, 
qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] 
which are still owned by some members
2021-10-20 10:14:34.913 ERROR {spanId=, traceId=} 
[org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(smallrye-kafka-consumer-thread-0) [Consumer 
clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With 
the COOPERATIVE protocol, owned partitions cannot be reassigned to other 
members; however the assignor has reassigned partitions [qa-qa-cf-events-32, 
qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] 
which are still owned by some members
2021-10-20 10:14:34.920 ERROR {spanId=, traceId=} [io.sma.rea.mes.kafka] 
(smallrye-kafka-consumer-thread-0) SRMSG18217: Unable to read a record from 
Kafka topics '[qa-qa-cf-events]': java.lang.IllegalStateException: Retries 
exhausted: 3/3
2021-10-20T13:14:34.928+03:00   Caused by: java.lang.IllegalStateException: 
Assignor supporting the COOPERATIVE protocol violates its requirements
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.validateCooperativeAssignment(ConsumerCoordinator.java:668)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:592)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:247)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:426)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
2021-10-20T13:14:34.928+03:00   at 
io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$poll$4(ReactiveKafkaConsumer.java:131)
2021-10-20T13:14:34.928+03:00   at 
io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$runOnPollingThread$0(ReactiveKafkaConsumer.java:101)
2021-10-20T13:14:34.928+03:00   at 
io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
2021-10-20T13:14:34.928+03:00   at 
io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
2021-10-20 10:14:34.921 WARN  {spanId=, traceId=} [io.sma.rea.mes.kafka] 
(smallrye-kafka-consumer-thread-0) SRMSG18228: A failure has been reported for 
Kafka topics '[qa-qa-cf-events]': java.lang.IllegalStateException: Retries 
exhausted: 3/3
{code}

UPD:
We run another tests and despite manually pulling and using 
'CooperativeStickyAssignor' from Kafka 3.0 client in our application, now we 
see the same errors as well, so  it seems the issue still exists


was (Author: andy_dufresne):
The entire group used 2.8.1 Kafka-client  and 'CooperativeStickyAssignor'.


Here are broker logs: we see that broker skipped assignment for generations 
10-12 since ConsumerCoordinator was stucked on it's side

 

!image-2021-10-25-11-53-40-221.png!

and here are logs from consumers for the same timeframe:


{code:java}
2021-10-20 10:14:27.878 ERROR {spanId=, traceId=} 
[org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(smallrye-kafka-consumer-thread-0) [Consumer 
clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With 
the COOPERATIVE protocol, owned partitions cannot be reassigned to other 
members; however the assignor has reassigned partitions [qa-qa-cf-events-32, 
qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] 
which are still owned by some members
2021-10-20 10:14:30.566 ERROR {spanId=, traceId=} 
[org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(smallrye-kafka-consumer-thread-0) [Consumer 
clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With 
the COOPERATIVE protocol, owned partitions cannot be reassigned to other 
members; however the assignor has reassigned partitions [qa-qa-cf-events-32, 
qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] 
which are still owned by some members
2021-10-20 10:14:34.913 ERROR {spanId=, traceId=} 
[org.apa.kaf.cli.con.int.ConsumerCoordinator] 
(smallrye-kafka-consumer-thread-0) [Consumer 
clientId=qa-qa-cf-executor-transform, groupId=qa-qa-cf-executor-transform] With 
the COOPERATIVE protocol, owned partitions cannot be reassigned to other 
members; however the assignor has reassigned partitions [qa-qa-cf-events-32, 
qa-qa-cf-events-13, qa-qa-cf-events-30, qa-qa-cf-events-38, qa-qa-cf-events-11] 
which are still owned by some members
2021-10-20 10:14:34.920 ERROR {spanId=, traceId=} [io.sma.rea.mes.kafka] 
(smallrye-kafka-consumer-thread-0) SRMSG18217: Unable to read a record from 
Kafka topics '[qa-qa-cf-events]': java.lang.IllegalStateException: Retries 
exhausted: 3/3
2021-10-20T13:14:34.928+03:00   Caused by: java.lang.IllegalStateException: 
Assignor supporting the COOPERATIVE protocol violates its requirements
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.validateCooperativeAssignment(ConsumerCoordinator.java:668)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:592)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:247)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:426)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:365)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:508)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1261)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1230)
2021-10-20T13:14:34.928+03:00   at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
2021-10-20T13:14:34.928+03:00   at 
io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$poll$4(ReactiveKafkaConsumer.java:131)
2021-10-20T13:14:34.928+03:00   at 
io.smallrye.reactive.messaging.kafka.impl.ReactiveKafkaConsumer.lambda$runOnPollingThread$0(ReactiveKafkaConsumer.java:101)
2021-10-20T13:14:34.928+03:00   at 
io.smallrye.context.impl.wrappers.SlowContextualSupplier.get(SlowContextualSupplier.java:21)
2021-10-20T13:14:34.928+03:00   at 
io.smallrye.mutiny.operators.uni.builders.UniCreateFromItemSupplier.subscribe(UniCreateFromItemSupplier.java:28)
2021-10-20 10:14:34.921 WARN  {spanId=, traceId=} [io.sma.rea.mes.kafka] 
(smallrye-kafka-consumer-thread-0) SRMSG18228: A failure has been reported for 
Kafka topics '[qa-qa-cf-events]': java.lang.IllegalStateException: Retries 
exhausted: 3/3
{code}


> Cooperative sticky assignor can get stuck with invalid SubscriptionState 
> input metadata
> ---------------------------------------------------------------------------------------
>
>                 Key: KAFKA-12984
>                 URL: https://issues.apache.org/jira/browse/KAFKA-12984
>             Project: Kafka
>          Issue Type: Bug
>          Components: consumer
>            Reporter: A. Sophie Blee-Goldman
>            Assignee: A. Sophie Blee-Goldman
>            Priority: Blocker
>             Fix For: 2.8.1, 3.0.0
>
>         Attachments: image-2021-10-25-11-53-40-221.png
>
>
> Some users have reported seeing their consumer group become stuck in the 
> CompletingRebalance phase when using the cooperative-sticky assignor. Based 
> on the request metadata we were able to deduce that multiple consumers were 
> reporting the same partition(s) in their "ownedPartitions" field of the 
> consumer protocol. Since this is an invalid state, the input causes the 
> cooperative-sticky assignor to detect that something is wrong and throw an 
> IllegalStateException. If the consumer application is set up to simply retry, 
> this will cause the group to appear to hang in the rebalance state.
> The "ownedPartitions" field is encoded based on the ConsumerCoordinator's 
> SubscriptionState, which was assumed to always be up to date. However there 
> may be cases where the consumer has dropped out of the group but fails to 
> clear the SubscriptionState, allowing it to report some partitions as owned 
> that have since been reassigned to another member.
> We should (a) fix the sticky assignment algorithm to resolve cases of 
> improper input conditions by invalidating the "ownedPartitions" in cases of 
> double ownership, and (b) shore up the ConsumerCoordinator logic to better 
> handle rejoining the group and keeping its internal state consistent. See 
> KAFKA-12983 for more details on (b)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to