mimaison commented on a change in pull request #8295: URL: https://github.com/apache/kafka/pull/8295#discussion_r464886489
########## File path: clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java ########## @@ -47,96 +42,11 @@ public static final int CONSUMER_REPLICA_ID = -1; public static final int DEBUGGING_REPLICA_ID = -2; - // top level fields - private static final Field.Int32 REPLICA_ID = new Field.Int32("replica_id", - "Broker id of the follower. For normal consumers, use -1."); - private static final Field.Int8 ISOLATION_LEVEL = new Field.Int8("isolation_level", - "This setting controls the visibility of transactional records. " + - "Using READ_UNCOMMITTED (isolation_level = 0) makes all records visible. With READ_COMMITTED " + - "(isolation_level = 1), non-transactional and COMMITTED transactional records are visible. " + - "To be more concrete, READ_COMMITTED returns all data from offsets smaller than the current " + - "LSO (last stable offset), and enables the inclusion of the list of aborted transactions in the " + - "result, which allows consumers to discard ABORTED transactional records"); - private static final Field.ComplexArray TOPICS = new Field.ComplexArray("topics", - "Topics to list offsets."); - - // topic level fields - private static final Field.ComplexArray PARTITIONS = new Field.ComplexArray("partitions", - "Partitions to list offsets."); - - // partition level fields - private static final Field.Int64 TIMESTAMP = new Field.Int64("timestamp", - "The target timestamp for the partition."); - private static final Field.Int32 MAX_NUM_OFFSETS = new Field.Int32("max_num_offsets", - "Maximum offsets to return."); - - private static final Field PARTITIONS_V0 = PARTITIONS.withFields( - PARTITION_ID, - TIMESTAMP, - MAX_NUM_OFFSETS); - - private static final Field TOPICS_V0 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V0); - - private static final Schema LIST_OFFSET_REQUEST_V0 = new Schema( - REPLICA_ID, - TOPICS_V0); - - // V1 removes max_num_offsets - private static final Field PARTITIONS_V1 = PARTITIONS.withFields( - PARTITION_ID, - TIMESTAMP); - - private static final Field TOPICS_V1 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V1); - - private static final Schema LIST_OFFSET_REQUEST_V1 = new Schema( - REPLICA_ID, - TOPICS_V1); - - // V2 adds a field for the isolation level - private static final Schema LIST_OFFSET_REQUEST_V2 = new Schema( - REPLICA_ID, - ISOLATION_LEVEL, - TOPICS_V1); - - // V3 bump used to indicate that on quota violation brokers send out responses before throttling. - private static final Schema LIST_OFFSET_REQUEST_V3 = LIST_OFFSET_REQUEST_V2; - - // V4 introduces the current leader epoch, which is used for fencing - private static final Field PARTITIONS_V4 = PARTITIONS.withFields( - PARTITION_ID, - CURRENT_LEADER_EPOCH, - TIMESTAMP); - - private static final Field TOPICS_V4 = TOPICS.withFields( - TOPIC_NAME, - PARTITIONS_V4); - - private static final Schema LIST_OFFSET_REQUEST_V4 = new Schema( - REPLICA_ID, - ISOLATION_LEVEL, - TOPICS_V4); - - // V5 bump to include new possible error code (OFFSET_NOT_AVAILABLE) - private static final Schema LIST_OFFSET_REQUEST_V5 = LIST_OFFSET_REQUEST_V4; - - public static Schema[] schemaVersions() { - return new Schema[] {LIST_OFFSET_REQUEST_V0, LIST_OFFSET_REQUEST_V1, LIST_OFFSET_REQUEST_V2, - LIST_OFFSET_REQUEST_V3, LIST_OFFSET_REQUEST_V4, LIST_OFFSET_REQUEST_V5}; - } - - private final int replicaId; - private final IsolationLevel isolationLevel; - private final Map<TopicPartition, PartitionData> partitionTimestamps; + private final ListOffsetRequestData data; private final Set<TopicPartition> duplicatePartitions; Review comment: There's a check on `duplicatePartitions` in KafkaApis. I'd rather keep the existing logic for now. We can see if we can strip this in a follow up PR ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org