dajac commented on code in PR #20907:
URL: https://github.com/apache/kafka/pull/20907#discussion_r2549727081
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1048,11 +1053,13 @@ void removePartitionEpochs(
currentPartitionEpoch.compute(topicId, (__, partitionsOrNull) -> {
if (partitionsOrNull != null) {
assignedPartitions.forEach(partitionId -> {
- Integer prevValue =
partitionsOrNull.remove(partitionId);
+ Integer prevValue = partitionsOrNull.get(partitionId);
if (prevValue != expectedEpoch) {
- throw new IllegalStateException(
+ log.warn(
Review Comment:
I am not a fan of those warnings. I think that users will be confused by
them and they will think that something is wrong even if it is actually
expected. Should we log them as debug? I would also include the group id. The
message is meaningless without the group id.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1089,7 +1096,7 @@ void addPartitionEpochs(
for (Integer partitionId : assignedPartitions) {
Integer prevValue = partitionsOrNull.put(partitionId,
epoch);
Review Comment:
While we are here, I wonder if we need to strengthen the logic. At minimum,
we could ensure that a member can only "acquire the lock" if is has an epoch
larger than the previous one. I am not sure whether we could have records
reordered to be in this situation though. @squah-confluent What do you think?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]