dajac commented on code in PR #12870:
URL: https://github.com/apache/kafka/pull/12870#discussion_r1064618016


##########
clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java:
##########
@@ -208,6 +203,56 @@ public OffsetFetchResponse(int throttleTimeMs,
         this.error = null;
     }
 
+    public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short 
version) {
+        super(ApiKeys.OFFSET_FETCH);
+        data = new OffsetFetchResponseData();
+
+        if (version >= 8) {
+            data.setGroups(groups);
+            error = null;
+
+            for (OffsetFetchResponseGroup group : data.groups()) {
+                this.groupLevelErrors.put(group.groupId(), 
Errors.forCode(group.errorCode()));
+            }
+        } else {
+            if (groups.size() != 1) {
+                throw new UnsupportedVersionException(
+                    "Version " + version + " of OffsetFetchResponse only 
support one group."
+                );
+            }
+
+            OffsetFetchResponseGroup group = groups.get(0);
+            data.setErrorCode(group.errorCode());
+            error = Errors.forCode(group.errorCode());
+
+            group.topics().forEach(topic -> {
+                OffsetFetchResponseTopic newTopic = new 
OffsetFetchResponseTopic().setName(topic.name());
+                data.topics().add(newTopic);
+
+                topic.partitions().forEach(partition -> {
+                    OffsetFetchResponsePartition newPartition;
+
+                    if (version < 2 && group.errorCode() != 
Errors.NONE.code()) {
+                        // Versions prior to version 2 does not support a top 
level error. Therefore
+                        // we put it at the partition level.
+                        newPartition = new OffsetFetchResponsePartition()
+                            .setPartitionIndex(partition.partitionIndex())
+                            .setErrorCode(group.errorCode());
+                    } else {

Review Comment:
   It is still possible to have a partition level error with version >= 2 (e.g. 
UNSTABLE_OFFSET_COMMIT). To answer your second point, if there is an error, the 
offset/metadata should be correctly set at this stage so we can just copy 
whatever we have got here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to