kosiakk commented on pull request #7539:
URL: https://github.com/apache/kafka/pull/7539#issuecomment-850399911
Depending on the assignment strategy, not all currently assigned partitions
will be rewoked. For example
`org.apache.kafka.clients.consumer.CooperativeStickyAssignor` tries to preserve
currently assigned partitions.
My Mock subclass replicates that behavour using a simple Set difference
calculation. I've just extended the Mock Consumer to overwrite `rebalance`
implementation, actually in Kotlin (here just to illustrate the intention)
```Kotlin
@Synchronized
override fun rebalance(newAssignment: Collection<TopicPartition>) {
val listener = lastRebalanceListener ?: error("Please call
`subscribe` before `rebalance`")
val revoked = assignment() - newAssignment
val assigned = newAssignment - assignment()
listener.onPartitionsRevoked(revoked)
super.rebalance(newAssignment)
listener.onPartitionsAssigned(assigned)
}
```
or a patch proposal for the main class in Java:
```Java
public synchronized void rebalance(Collection<TopicPartition>
newAssignment) {
this.records.clear();
// todo check this.subscriptions.rebalanceListener() for null
final Set<TopicPartition> revoked =
this.subscriptions.assignedPartitions();
revoked.removeAll(newAssignment);
final Set<TopicPartition> assigned = new HashSet<>(newAssignment);
assigned.removeAll(this.subscriptions.assignedPartitions());
this.subscriptions.rebalanceListener().onPartitionsRevoked(revoked);
this.subscriptions.assignFromSubscribed(newAssignment);
this.subscriptions.rebalanceListener().onPartitionsAssigned(assigned);
}
```
You can actually see this in the log of a real implementation when a second
node joins the group:
```
[consumerThread] INFO
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer
clientId=consumer-test1-1, groupId=test1] Updating assignment with
Assigned partitions: [INPUT-2, INPUT-3]
Current owned partitions: [INPUT-2, INPUT-3, INPUT-0,
INPUT-1]
Added partitions (assigned - owned): []
Revoked partitions (owned - assigned): [INPUT-0, INPUT-1]
```
and then debugger confirms that `onPartitionsRevoked` is called only with
**2-element set** with the difference, and then `onPartitionsAssigned` is
callsed with an **empty set**.
tl;dr: please don't revoke and then add again unnecessarily, it might be
expensive in the app
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]