jolshan commented on code in PR #12886:
URL: https://github.com/apache/kafka/pull/12886#discussion_r1067494981


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java:
##########
@@ -116,4 +118,80 @@ public void maybeSetThrottleTimeMs(int throttleTimeMs) {
     public boolean shouldClientThrottle(short version) {
         return version >= 4;
     }
+
+    public static class Builder {
+        OffsetCommitResponseData data = new OffsetCommitResponseData();
+        HashMap<String, OffsetCommitResponseTopic> byTopicName = new 
HashMap<>();
+
+        private OffsetCommitResponseTopic getOrCreateTopic(
+            String topicName
+        ) {
+            OffsetCommitResponseTopic topic = byTopicName.get(topicName);
+            if (topic == null) {
+                topic = new OffsetCommitResponseTopic().setName(topicName);
+                data.topics().add(topic);
+                byTopicName.put(topicName, topic);
+            }
+            return topic;
+        }
+
+        public Builder addPartition(
+            String topicName,
+            int partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+
+            topicResponse.partitions().add(new OffsetCommitResponsePartition()
+                .setPartitionIndex(partitionIndex)
+                .setErrorCode(error.code()));
+
+            return this;
+        }
+
+        public <P> Builder addPartitions(
+            String topicName,
+            List<P> partitions,
+            Function<P, Integer> partitionIndex,
+            Errors error
+        ) {
+            final OffsetCommitResponseTopic topicResponse = 
getOrCreateTopic(topicName);
+
+            partitions.forEach(partition -> {
+                topicResponse.partitions().add(new 
OffsetCommitResponsePartition()
+                    .setPartitionIndex(partitionIndex.apply(partition))
+                    .setErrorCode(error.code()));
+            });
+
+            return this;
+        }
+
+        public Builder merge(
+            OffsetCommitResponseData newData
+        ) {
+            if (data.topics().isEmpty()) {
+                // If the current data is empty, we can discard it and use the 
new data.
+                data = newData;
+            } else {
+                // Otherwise, we have to merge them together.
+                newData.topics().forEach(newTopic -> {
+                    OffsetCommitResponseTopic existingTopic = 
byTopicName.get(newTopic.name());
+                    if (existingTopic == null) {
+                        // If no topic exists, we can directly copy the new 
topic data.
+                        data.topics().add(newTopic);
+                        byTopicName.put(newTopic.name(), newTopic);
+                    } else {
+                        // Otherwise, we add the partitions to the existing 
one.
+                        
existingTopic.partitions().addAll(newTopic.partitions());

Review Comment:
   I think it is ok to keep as is, but maybe make a comment that we assume 
there are no overlapping partitions?
   
   As a side note, If there was overlap, we would just have two of the same 
partition in the response right? One with the error and one without?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to