mimaison commented on a change in pull request #8295:
URL: https://github.com/apache/kafka/pull/8295#discussion_r464886489



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java
##########
@@ -47,96 +42,11 @@
     public static final int CONSUMER_REPLICA_ID = -1;
     public static final int DEBUGGING_REPLICA_ID = -2;
 
-    // top level fields
-    private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id",
-            "Broker id of the follower. For normal consumers, use -1.");
-    private static final Field.Int8 ISOLATION_LEVEL = new 
Field.Int8("isolation_level",
-            "This setting controls the visibility of transactional records. " +
-                    "Using READ_UNCOMMITTED (isolation_level = 0) makes all 
records visible. With READ_COMMITTED " +
-                    "(isolation_level = 1), non-transactional and COMMITTED 
transactional records are visible. " +
-                    "To be more concrete, READ_COMMITTED returns all data from 
offsets smaller than the current " +
-                    "LSO (last stable offset), and enables the inclusion of 
the list of aborted transactions in the " +
-                    "result, which allows consumers to discard ABORTED 
transactional records");
-    private static final Field.ComplexArray TOPICS = new 
Field.ComplexArray("topics",
-            "Topics to list offsets.");
-
-    // topic level fields
-    private static final Field.ComplexArray PARTITIONS = new 
Field.ComplexArray("partitions",
-            "Partitions to list offsets.");
-
-    // partition level fields
-    private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp",
-            "The target timestamp for the partition.");
-    private static final Field.Int32 MAX_NUM_OFFSETS = new 
Field.Int32("max_num_offsets",
-            "Maximum offsets to return.");
-
-    private static final Field PARTITIONS_V0 = PARTITIONS.withFields(
-            PARTITION_ID,
-            TIMESTAMP,
-            MAX_NUM_OFFSETS);
-
-    private static final Field TOPICS_V0 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V0);
-
-    private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema(
-            REPLICA_ID,
-            TOPICS_V0);
-
-    // V1 removes max_num_offsets
-    private static final Field PARTITIONS_V1 = PARTITIONS.withFields(
-            PARTITION_ID,
-            TIMESTAMP);
-
-    private static final Field TOPICS_V1 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V1);
-
-    private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema(
-            REPLICA_ID,
-            TOPICS_V1);
-
-    // V2 adds a field for the isolation level
-    private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema(
-            REPLICA_ID,
-            ISOLATION_LEVEL,
-            TOPICS_V1);
-
-    // V3 bump used to indicate that on quota violation brokers send out 
responses before throttling.
-    private static final Schema LIST_OFFSET_REQUEST_V3 = 
LIST_OFFSET_REQUEST_V2;
-
-    // V4 introduces the current leader epoch, which is used for fencing
-    private static final Field PARTITIONS_V4 = PARTITIONS.withFields(
-            PARTITION_ID,
-            CURRENT_LEADER_EPOCH,
-            TIMESTAMP);
-
-    private static final Field TOPICS_V4 = TOPICS.withFields(
-            TOPIC_NAME,
-            PARTITIONS_V4);
-
-    private static final Schema LIST_OFFSET_REQUEST_V4 = new Schema(
-            REPLICA_ID,
-            ISOLATION_LEVEL,
-            TOPICS_V4);
-
-    // V5 bump to include new possible error code (OFFSET_NOT_AVAILABLE)
-    private static final Schema LIST_OFFSET_REQUEST_V5 = 
LIST_OFFSET_REQUEST_V4;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, 
LIST_OFFSET_REQUEST_V2,
-            LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4, 
LIST_OFFSET_REQUEST_V5};
-    }
-
-    private final int replicaId;
-    private final IsolationLevel isolationLevel;
-    private final Map<TopicPartition, PartitionData> partitionTimestamps;
+    private final ListOffsetRequestData data;
     private final Set<TopicPartition> duplicatePartitions;

Review comment:
       There's a check on `duplicatePartitions` in KafkaApis. I'd rather keep 
the existing logic for now. We can see if we can strip this in a follow up PR




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to