dajac commented on code in PR #12886: URL: https://github.com/apache/kafka/pull/12886#discussion_r1066988206
########## clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java: ########## @@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) { public boolean shouldClientThrottle(short version) { return version >= 4; } + + public static class Builder { + OffsetCommitResponseData data = new OffsetCommitResponseData(); + HashMap<String, OffsetCommitResponseTopic> byTopicName = new HashMap<>(); + + private OffsetCommitResponseTopic getOrCreateTopic( + String topicName + ) { + OffsetCommitResponseTopic topic = byTopicName.get(topicName); + if (topic == null) { + topic = new OffsetCommitResponseTopic().setName(topicName); + data.topics().add(topic); + byTopicName.put(topicName, topic); + } + return topic; + } + + public Builder addPartition( + String topicName, + int partitionIndex, + Errors error + ) { + final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + topicResponse.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code())); + + return this; + } + + public <P> Builder addPartitions( + String topicName, + List<P> partitions, + Function<P, Integer> partitionIndex, + Errors error + ) { + final OffsetCommitResponseTopic topicResponse = getOrCreateTopic(topicName); + + partitions.forEach(partition -> { + topicResponse.partitions().add(new OffsetCommitResponsePartition() + .setPartitionIndex(partitionIndex.apply(partition)) + .setErrorCode(error.code())); + }); + + return this; + } + + public Builder merge( + OffsetCommitResponseData newData + ) { + if (data.topics().isEmpty()) { + // If the current data is empty, we can discard it and use the new data. + data = newData; + } else { + // Otherwise, we have to merge them together. + newData.topics().forEach(newTopic -> { + OffsetCommitResponseTopic existingTopic = byTopicName.get(newTopic.name()); + if (existingTopic == null) { + // If no topic exists, we can directly copy the new topic data. + data.topics().add(newTopic); + byTopicName.put(newTopic.name(), newTopic); + } else { + // Otherwise, we add the partitions to the existing one. + existingTopic.partitions().addAll(newTopic.partitions()); Review Comment: That's right. I thought about adding a check but it is costly because the only way to check is to iterate over the existing partitions to check if the new one is there. Given that we know that partitions are not supposed to be duplicated by the user of this class, I thought that it was not necessary. What do you think? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org