jeffkbkim commented on code in PR #12902: URL: https://github.com/apache/kafka/pull/12902#discussion_r1071515902
########## clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java: ########## @@ -44,6 +48,83 @@ */ public class OffsetDeleteResponse extends AbstractResponse { + public static class Builder { + OffsetDeleteResponseData data = new OffsetDeleteResponseData(); + + private OffsetDeleteResponseTopic getOrCreateTopic( + String topicName + ) { + OffsetDeleteResponseTopic topic = data.topics().find(topicName); + if (topic == null) { + topic = new OffsetDeleteResponseTopic().setName(topicName); + data.topics().add(topic); + } + return topic; + } + + public Builder addPartition( + String topicName, + int partitionIndex, + Errors error + ) { + final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName); + + topicResponse.partitions().add(new OffsetDeleteResponsePartition() + .setPartitionIndex(partitionIndex) + .setErrorCode(error.code())); + + return this; + } + + public <P> Builder addPartitions( + String topicName, + List<P> partitions, + Function<P, Integer> partitionIndex, + Errors error + ) { + final OffsetDeleteResponseTopic topicResponse = getOrCreateTopic(topicName); + + partitions.forEach(partition -> { + topicResponse.partitions().add(new OffsetDeleteResponsePartition() + .setPartitionIndex(partitionIndex.apply(partition)) + .setErrorCode(error.code())); + }); + + return this; + } + + public Builder merge( + OffsetDeleteResponseData newData + ) { + if (data.topics().isEmpty()) { + // If the current data is empty, we can discard it and use the new data. + data = newData; + } else { + // Otherwise, we have to merge them together. + newData.topics().forEach(newTopic -> { + OffsetDeleteResponseTopic existingTopic = data.topics().find(newTopic.name()); + if (existingTopic == null) { + // If no topic exists, we can directly copy the new topic data. + data.topics().add(newTopic.duplicate()); + } else { + // Otherwise, we add the partitions to the existing one. Note we + // expect non-overlapping partitions here as we don't verify + // if the partition is already in the list before adding it. + newTopic.partitions().forEach(partition -> { + existingTopic.partitions().add(partition.duplicate()); Review Comment: was wondering why we were duplicating and realized that we need to reset the element's prev and next values to insert -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org