lianetm commented on code in PR #19642: URL: https://github.com/apache/kafka/pull/19642#discussion_r2073862478
########## clients/src/main/java/org/apache/kafka/clients/admin/internals/ListConsumerGroupOffsetsHandler.java: ########## @@ -139,40 +139,52 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> handleR ) { validateKeys(groupIds); - final OffsetFetchResponse response = (OffsetFetchResponse) abstractResponse; + var response = (OffsetFetchResponse) abstractResponse; + var completed = new HashMap<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>>(); + var failed = new HashMap<CoordinatorKey, Throwable>(); + var unmapped = new ArrayList<CoordinatorKey>(); - Map<CoordinatorKey, Map<TopicPartition, OffsetAndMetadata>> completed = new HashMap<>(); - Map<CoordinatorKey, Throwable> failed = new HashMap<>(); - List<CoordinatorKey> unmapped = new ArrayList<>(); for (CoordinatorKey coordinatorKey : groupIds) { - String group = coordinatorKey.idValue; - if (response.groupHasError(group)) { - handleGroupError(CoordinatorKey.byGroupId(group), response.groupLevelError(group), failed, unmapped); + var groupId = coordinatorKey.idValue; + var group = response.group(groupId); + var error = Errors.forCode(group.errorCode()); + + if (error != Errors.NONE) { + handleGroupError( + coordinatorKey, + error, + failed, + unmapped + ); } else { - final Map<TopicPartition, OffsetAndMetadata> groupOffsetsListing = new HashMap<>(); - Map<TopicPartition, OffsetFetchResponse.PartitionData> responseData = response.partitionDataMap(group); - for (Map.Entry<TopicPartition, OffsetFetchResponse.PartitionData> partitionEntry : responseData.entrySet()) { - final TopicPartition topicPartition = partitionEntry.getKey(); - OffsetFetchResponse.PartitionData partitionData = partitionEntry.getValue(); - final Errors error = partitionData.error; - - if (error == Errors.NONE) { - final long offset = partitionData.offset; - final String metadata = partitionData.metadata; - final Optional<Integer> leaderEpoch = partitionData.leaderEpoch; - // Negative offset indicates that the group has no committed offset for this partition - if (offset < 0) { - groupOffsetsListing.put(topicPartition, null); + var offsets = new HashMap<TopicPartition, OffsetAndMetadata>(); + + group.topics().forEach(topic -> { Review Comment: nit: I expect we don't need this { ########## clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java: ########## @@ -261,20 +114,53 @@ public OffsetFetchResponse(List<OffsetFetchResponseGroup> groups, short version) public OffsetFetchResponse(OffsetFetchResponseData data, short version) { super(ApiKeys.OFFSET_FETCH); this.data = data; - // for version 2 and later use the top-level error code (in ERROR_CODE_KEY_NAME) from the response. - // for older versions there is no top-level error in the response and all errors are partition errors, - // so if there is a group or coordinator error at the partition level use that as the top-level error. - // this way clients can depend on the top-level error regardless of the offset fetch version. - // we return the error differently starting with version 8, so we will only populate the - // error field if we are between version 2 and 7. if we are in version 8 or greater, then - // we will populate the map of group id to error codes. + this.version = version; + } + + private Map<String, OffsetFetchResponseData.OffsetFetchResponseGroup> groups = null; Review Comment: very nice building block here. Maybe worth a comment about how this structure now keeps normalized data per group (no matter the request version with/without data and errors per group) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org