dajac commented on a change in pull request #9547: URL: https://github.com/apache/kafka/pull/9547#discussion_r526373445
########## File path: clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java ########## @@ -51,133 +41,82 @@ * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors */ public class OffsetsForLeaderEpochResponse extends AbstractResponse { - private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics", - "An array of topics for which we have leader offsets for some requested partition leader epoch"); - private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions", - "An array of offsets by partition"); - private static final Field.Int64 END_OFFSET = new Field.Int64("end_offset", "The end offset"); - - private static final Field PARTITIONS_V0 = PARTITIONS.withFields( - ERROR_CODE, - PARTITION_ID, - END_OFFSET); - private static final Field TOPICS_V0 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V0); - private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0 = new Schema( - TOPICS_V0); - - // V1 added a per-partition leader epoch field which specifies which leader epoch the end offset belongs to - private static final Field PARTITIONS_V1 = PARTITIONS.withFields( - ERROR_CODE, - PARTITION_ID, - LEADER_EPOCH, - END_OFFSET); - private static final Field TOPICS_V1 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V1); - private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new Schema( - TOPICS_V1); - - // V2 bumped for addition of current leader epoch to the request schema and the addition of the throttle - // time in the response - private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2 = new Schema( - THROTTLE_TIME_MS, - TOPICS_V1); - - private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3 = OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2; - - - public static Schema[] schemaVersions() { - return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1, - OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2, OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3}; + + private final OffsetForLeaderEpochResponseData data; + + public OffsetsForLeaderEpochResponse(OffsetForLeaderEpochResponseData data) { + this.data = data; } - private final int throttleTimeMs; - private final Map<TopicPartition, EpochEndOffset> epochEndOffsetsByPartition; - - public OffsetsForLeaderEpochResponse(Struct struct) { - this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME); - this.epochEndOffsetsByPartition = new HashMap<>(); - for (Object topicAndEpocsObj : struct.get(TOPICS)) { - Struct topicAndEpochs = (Struct) topicAndEpocsObj; - String topic = topicAndEpochs.get(TOPIC_NAME); - for (Object partitionAndEpochObj : topicAndEpochs.get(PARTITIONS)) { - Struct partitionAndEpoch = (Struct) partitionAndEpochObj; - Errors error = Errors.forCode(partitionAndEpoch.get(ERROR_CODE)); - int partitionId = partitionAndEpoch.get(PARTITION_ID); - TopicPartition tp = new TopicPartition(topic, partitionId); - int leaderEpoch = partitionAndEpoch.getOrElse(LEADER_EPOCH, RecordBatch.NO_PARTITION_LEADER_EPOCH); - long endOffset = partitionAndEpoch.get(END_OFFSET); - epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, leaderEpoch, endOffset)); - } - } + public OffsetsForLeaderEpochResponse(Struct struct, short version) { + data = new OffsetForLeaderEpochResponseData(struct, version); + } + + public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> offsets) { + this(0, offsets); } - public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> epochsByTopic) { - this(DEFAULT_THROTTLE_TIME, epochsByTopic); + public OffsetsForLeaderEpochResponse(int throttleTimeMs, Map<TopicPartition, EpochEndOffset> offsets) { + data = new OffsetForLeaderEpochResponseData(); + data.setThrottleTimeMs(throttleTimeMs); + + offsets.forEach((tp, offset) -> { + OffsetForLeaderTopicResult topic = data.topics().find(tp.topic()); + if (topic == null) { + topic = new OffsetForLeaderTopicResult().setTopic(tp.topic()); + data.topics().add(topic); + } + topic.partitions().add(new OffsetForLeaderPartitionResult() + .setPartition(tp.partition()) + .setErrorCode(offset.error().code()) + .setLeaderEpoch(offset.leaderEpoch()) + .setEndOffset(offset.endOffset())); + }); } - public OffsetsForLeaderEpochResponse(int throttleTimeMs, Map<TopicPartition, EpochEndOffset> epochsByTopic) { - this.throttleTimeMs = throttleTimeMs; - this.epochEndOffsetsByPartition = epochsByTopic; + public OffsetForLeaderEpochResponseData data() { + return data; } public Map<TopicPartition, EpochEndOffset> responses() { Review comment: I will address this in a follow-up PR. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org