dajac commented on a change in pull request #9547:
URL: https://github.com/apache/kafka/pull/9547#discussion_r526373445



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/OffsetsForLeaderEpochResponse.java
##########
@@ -51,133 +41,82 @@
  * - {@link Errors#UNKNOWN_SERVER_ERROR} For any unexpected errors
  */
 public class OffsetsForLeaderEpochResponse extends AbstractResponse {
-    private static final Field.ComplexArray TOPICS = new 
Field.ComplexArray("topics",
-            "An array of topics for which we have leader offsets for some 
requested partition leader epoch");
-    private static final Field.ComplexArray PARTITIONS = new 
Field.ComplexArray("partitions",
-            "An array of offsets by partition");
-    private static final Field.Int64 END_OFFSET = new 
Field.Int64("end_offset", "The end offset");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            ERROR_CODE,
-            PARTITION_ID,
-            END_OFFSET);
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0 = new 
Schema(
-            TOPICS_V0);
-
-    // V1 added a per-partition leader epoch field which specifies which 
leader epoch the end offset belongs to
-    private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
-            ERROR_CODE,
-            PARTITION_ID,
-            LEADER_EPOCH,
-            END_OFFSET);
-    private static final Field TOPICS_V1 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V1);
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1 = new 
Schema(
-            TOPICS_V1);
-
-    // V2 bumped for addition of current leader epoch to the request schema 
and the addition of the throttle
-    // time in the response
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2 = new 
Schema(
-            THROTTLE_TIME_MS,
-            TOPICS_V1);
-
-    private static final Schema OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3 = 
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2;
-
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{OFFSET_FOR_LEADER_EPOCH_RESPONSE_V0, 
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V1,
-            OFFSET_FOR_LEADER_EPOCH_RESPONSE_V2, 
OFFSET_FOR_LEADER_EPOCH_RESPONSE_V3};
+
+    private final OffsetForLeaderEpochResponseData data;
+
+    public OffsetsForLeaderEpochResponse(OffsetForLeaderEpochResponseData 
data) {
+        this.data = data;
     }
 
-    private final int throttleTimeMs;
-    private final Map<TopicPartition, EpochEndOffset> 
epochEndOffsetsByPartition;
-
-    public OffsetsForLeaderEpochResponse(Struct struct) {
-        this.throttleTimeMs = struct.getOrElse(THROTTLE_TIME_MS, 
DEFAULT_THROTTLE_TIME);
-        this.epochEndOffsetsByPartition = new HashMap<>();
-        for (Object topicAndEpocsObj : struct.get(TOPICS)) {
-            Struct topicAndEpochs = (Struct) topicAndEpocsObj;
-            String topic = topicAndEpochs.get(TOPIC_NAME);
-            for (Object partitionAndEpochObj : topicAndEpochs.get(PARTITIONS)) 
{
-                Struct partitionAndEpoch = (Struct) partitionAndEpochObj;
-                Errors error = 
Errors.forCode(partitionAndEpoch.get(ERROR_CODE));
-                int partitionId = partitionAndEpoch.get(PARTITION_ID);
-                TopicPartition tp = new TopicPartition(topic, partitionId);
-                int leaderEpoch = partitionAndEpoch.getOrElse(LEADER_EPOCH, 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
-                long endOffset = partitionAndEpoch.get(END_OFFSET);
-                epochEndOffsetsByPartition.put(tp, new EpochEndOffset(error, 
leaderEpoch, endOffset));
-            }
-        }
+    public OffsetsForLeaderEpochResponse(Struct struct, short version) {
+        data = new OffsetForLeaderEpochResponseData(struct, version);
+    }
+
+    public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> 
offsets) {
+        this(0, offsets);
     }
 
-    public OffsetsForLeaderEpochResponse(Map<TopicPartition, EpochEndOffset> 
epochsByTopic) {
-        this(DEFAULT_THROTTLE_TIME, epochsByTopic);
+    public OffsetsForLeaderEpochResponse(int throttleTimeMs, 
Map<TopicPartition, EpochEndOffset> offsets) {
+        data = new OffsetForLeaderEpochResponseData();
+        data.setThrottleTimeMs(throttleTimeMs);
+
+        offsets.forEach((tp, offset) -> {
+            OffsetForLeaderTopicResult topic = data.topics().find(tp.topic());
+            if (topic == null) {
+                topic = new OffsetForLeaderTopicResult().setTopic(tp.topic());
+                data.topics().add(topic);
+            }
+            topic.partitions().add(new OffsetForLeaderPartitionResult()
+                .setPartition(tp.partition())
+                .setErrorCode(offset.error().code())
+                .setLeaderEpoch(offset.leaderEpoch())
+                .setEndOffset(offset.endOffset()));
+        });
     }
 
-    public OffsetsForLeaderEpochResponse(int throttleTimeMs, 
Map<TopicPartition, EpochEndOffset> epochsByTopic) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.epochEndOffsetsByPartition = epochsByTopic;
+    public OffsetForLeaderEpochResponseData data() {
+        return data;
     }
 
     public Map<TopicPartition, EpochEndOffset> responses() {

Review comment:
       I will address this in a follow-up PR.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to