jeffkbkim commented on code in PR #12902:
URL: https://github.com/apache/kafka/pull/12902#discussion_r1071515902


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java:
##########
@@ -44,6 +48,83 @@
  */
 public class OffsetDeleteResponse extends AbstractResponse {
 
+    public static class Builder {
+        OffsetDeleteResponseData data = new OffsetDeleteResponseData();
+
+        private OffsetDeleteResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetDeleteResponseTopic topic = data.topics().find(topicName);
+            if (topic == null) {
+                topic = new OffsetDeleteResponseTopic().setName(topicName);
+                data.topics().add(topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetDeleteResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetDeleteResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetDeleteResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new 
OffsetDeleteResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetDeleteResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the 
new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetDeleteResponseTopic existingTopic = 
data.topics().find(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new 
topic data.
+                        data.topics().add(newTopic.duplicate());
+                    } else {
+                        // Otherwise, we add the partitions to the existing 
one. Note we
+                        // expect non-overlapping partitions here as we don't 
verify
+                        // if the partition is already in the list before 
adding it.
+                        newTopic.partitions().forEach(partition -> {
+                            
existingTopic.partitions().add(partition.duplicate());

Review Comment:
   was wondering why we were duplicating and realized that we need to reset the 
element's prev and next values to insert



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to