hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r461173162



##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -55,35 +55,35 @@
       "about": "The minimum bytes to accumulate in the response." },
     { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": 
"0x7fffffff", "ignorable": true,
       "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
limit may not be honored." },
-    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
"0", "ignorable": false,
+    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
"0", "ignorable": true,

Review comment:
       I guess the implicit expectation is that if the protocol does not 
support the `read_committed` isolation level, then it wouldn't have 
transactional data anyway, so reverting to `read_uncommitted` is safe. Can't 
find a fault with that.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, 
short apiVersion, Str
             case PRODUCE:
                 return new ProduceRequest(struct, apiVersion);
             case FETCH:
-                return new FetchRequest(struct, apiVersion);
+                return new FetchRequest(new FetchRequestData(struct, 
apiVersion), apiVersion);

Review comment:
       nit: any reason not to stick with the same constructor convention as the 
other requests?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} 
objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on 
this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is 
made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send 
consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying 
record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care 
must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is 
flushed to the consumer. This class is
+ * intended to be used with {@link 
org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+    private final String dest;
+    private final Consumer<Send> sendConsumer;
+    private final ByteBuffer buffer;
+    private int mark;
+
+    public RecordsWriter(String dest, int totalSize, Consumer<Send> 
sendConsumer) {

Review comment:
       Could we rename `totalSize` so that it is clear that it does not cover 
the record sizes. Maybe `totalOverheadSize` or `totalNonRecordSize` or 
something like that.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} 
objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on 
this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is 
made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send 
consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying 
record-set's transfer logic.
+ *
+ * For example,
+ *
+ * <pre>
+ *     recordsWritable.writeInt(10);
+ *     recordsWritable.writeRecords(records1);
+ *     recordsWritable.writeInt(20);
+ *     recordsWritable.writeRecords(records2);
+ *     recordsWritable.writeInt(30);
+ *     recordsWritable.flush();
+ * </pre>
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care 
must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is 
flushed to the consumer. This class is
+ * intended to be used with {@link 
org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {

Review comment:
       nit: wonder if this should be `RecordsWritable` for consistency with 
`Writable`.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read 
records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+    private final ByteBuffer buf;
+
+    public RecordsReader(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public byte readByte() {
+        return buf.get();
+    }
+
+    @Override
+    public short readShort() {
+        return buf.getShort();
+    }
+
+    @Override
+    public int readInt() {
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() {
+        return buf.getLong();
+    }
+
+    @Override
+    public double readDouble() {
+        return ByteUtils.readDouble(buf);
+    }
+
+    @Override
+    public void readArray(byte[] arr) {
+        buf.get(arr);
+    }
+
+    @Override
+    public int readUnsignedVarint() {
+        return ByteUtils.readUnsignedVarint(buf);
+    }
+
+    @Override
+    public ByteBuffer readByteBuffer(int length) {

Review comment:
       More of a side question, but is this length guaranteed to be less than 
the buffer size? Wondering if it is worth adding range checking.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> 
responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = 
Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
INVALID_PREFERRED_REPLICA_ID)
-                
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type 
found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if 
(partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = 
partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new 
ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : 
abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) 
abortedTransactionObj;
-                            long producerId = 
abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = 
abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new 
AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, 
abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), 
partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, 
(short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), 
struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() 
+ 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + 
responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> 
fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> 
fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - 
totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();

Review comment:
       nit: not a big deal, but I feel like calling `flush` should really be 
the responsibility of `write`.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> 
responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = 
Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
INVALID_PREFERRED_REPLICA_ID)
-                
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type 
found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if 
(partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = 
partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new 
ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : 
abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) 
abortedTransactionObj;
-                            long producerId = 
abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = 
abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new 
AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, 
abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), 
partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, 
(short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), 
struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() 
+ 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + 
responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> 
fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> 
fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - 
totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();
+
+        // Compute the total size of all the Sends and write it out along with 
the header in the first Send
+        ResponseHeaderData responseHeaderData = responseHeader.data();
+
+        int headerSize = responseHeaderData.size(cache, 
responseHeader.headerVersion());
+        int bodySize = (int) sends.stream().mapToLong(Send::size).sum();

Review comment:
       Instead of the cast, could we add a validation check?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java
##########
@@ -16,5 +16,16 @@
  */
 package org.apache.kafka.common.requests;
 
+import org.apache.kafka.common.protocol.ApiMessage;
+
 public interface AbstractRequestResponse {
+    /**
+     * Return the auto-generated `Message` instance if this request/response 
relies on one for
+     * serialization/deserialization. If this class has not yet been updated 
to rely on the auto-generated protocol
+     * classes, return `null`.
+     * @return
+     */
+    default ApiMessage data() {

Review comment:
       @mumrah Do we need this for this PR or can we leave this for #7409?

##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": 
false,
+    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": 
true,
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", 
"ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch 
session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": 
"0+",
       "about": "The response topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": 
"topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]FetchablePartitionResponse", 
"versions": "0+",
+      { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", 
"versions": "0+",
         "about": "The topic partitions.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
-          "about": "The partiiton index." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
-          "about": "The error code, or 0 if there was no fetch error." },
-        { "name": "HighWatermark", "type": "int64", "versions": "0+",
-          "about": "The current high water mark." },
-        { "name": "LastStableOffset", "type": "int64", "versions": "4+", 
"default": "-1", "ignorable": true,
-          "about": "The last stable offset (or LSO) of the partition. This is 
the last offset such that the state of all transactional records prior to this 
offset have been decided (ABORTED or COMMITTED)" },
-        { "name": "LogStartOffset", "type": "int64", "versions": "5+", 
"default": "-1", "ignorable": true,
-          "about": "The current log start offset." },
-        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", 
"nullableVersions": "4+", "ignorable": false,
-          "about": "The aborted transactions.",  "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "4+", 
"entityType": "producerId",
-            "about": "The producer id associated with the aborted 
transaction." },
-          { "name": "FirstOffset", "type": "int64", "versions": "4+",
-            "about": "The first offset in the aborted transaction." }
+        { "name":  "PartitionHeader", "type": "PartitionHeader", "versions": 
"0+",
+          "fields":  [
+          { "name": "Partition", "type": "int32", "versions": "0+",
+            "about": "The partition index." },
+          { "name": "ErrorCode", "type": "int16", "versions": "0+",
+            "about": "The error code, or 0 if there was no fetch error." },
+          { "name": "HighWatermark", "type": "int64", "versions": "0+",
+            "about": "The current high water mark." },
+          { "name": "LastStableOffset", "type": "int64", "versions": "4+", 
"default": "-1", "ignorable": true,
+            "about": "The last stable offset (or LSO) of the partition. This 
is the last offset such that the state of all transactional records prior to 
this offset have been decided (ABORTED or COMMITTED)" },
+          { "name": "LogStartOffset", "type": "int64", "versions": "5+", 
"default": "-1", "ignorable": true,
+            "about": "The current log start offset." },
+          { "name": "AbortedTransactions", "type": "[]AbortedTransaction", 
"versions": "4+", "nullableVersions": "4+", "ignorable": true,
+            "about": "The aborted transactions.",  "fields": [
+            { "name": "ProducerId", "type": "int64", "versions": "4+", 
"entityType": "producerId",
+              "about": "The producer id associated with the aborted 
transaction." },
+            { "name": "FirstOffset", "type": "int64", "versions": "4+",
+              "about": "The first offset in the aborted transaction." }
+          ]},
+          { "name": "PreferredReadReplica", "type": "int32", "versions": 
"11+", "default": "-1", "ignorable": true,

Review comment:
       I'm wondering if this should be ignorable. When this is set, the leader 
returns no data, so it relies crucially on the follower redirection.

##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": 
false,
+    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": 
true,
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", 
"ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch 
session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": 
"0+",
       "about": "The response topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": 
"topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]FetchablePartitionResponse", 
"versions": "0+",
+      { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", 
"versions": "0+",
         "about": "The topic partitions.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
-          "about": "The partiiton index." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
-          "about": "The error code, or 0 if there was no fetch error." },
-        { "name": "HighWatermark", "type": "int64", "versions": "0+",
-          "about": "The current high water mark." },
-        { "name": "LastStableOffset", "type": "int64", "versions": "4+", 
"default": "-1", "ignorable": true,
-          "about": "The last stable offset (or LSO) of the partition. This is 
the last offset such that the state of all transactional records prior to this 
offset have been decided (ABORTED or COMMITTED)" },
-        { "name": "LogStartOffset", "type": "int64", "versions": "5+", 
"default": "-1", "ignorable": true,
-          "about": "The current log start offset." },
-        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", 
"nullableVersions": "4+", "ignorable": false,
-          "about": "The aborted transactions.",  "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "4+", 
"entityType": "producerId",
-            "about": "The producer id associated with the aborted 
transaction." },
-          { "name": "FirstOffset", "type": "int64", "versions": "4+",
-            "about": "The first offset in the aborted transaction." }
+        { "name":  "PartitionHeader", "type": "PartitionHeader", "versions": 
"0+",
+          "fields":  [
+          { "name": "Partition", "type": "int32", "versions": "0+",
+            "about": "The partition index." },
+          { "name": "ErrorCode", "type": "int16", "versions": "0+",
+            "about": "The error code, or 0 if there was no fetch error." },
+          { "name": "HighWatermark", "type": "int64", "versions": "0+",
+            "about": "The current high water mark." },
+          { "name": "LastStableOffset", "type": "int64", "versions": "4+", 
"default": "-1", "ignorable": true,
+            "about": "The last stable offset (or LSO) of the partition. This 
is the last offset such that the state of all transactional records prior to 
this offset have been decided (ABORTED or COMMITTED)" },
+          { "name": "LogStartOffset", "type": "int64", "versions": "5+", 
"default": "-1", "ignorable": true,
+            "about": "The current log start offset." },
+          { "name": "AbortedTransactions", "type": "[]AbortedTransaction", 
"versions": "4+", "nullableVersions": "4+", "ignorable": true,
+            "about": "The aborted transactions.",  "fields": [
+            { "name": "ProducerId", "type": "int64", "versions": "4+", 
"entityType": "producerId",
+              "about": "The producer id associated with the aborted 
transaction." },
+            { "name": "FirstOffset", "type": "int64", "versions": "4+",
+              "about": "The first offset in the aborted transaction." }
+          ]},
+          { "name": "PreferredReadReplica", "type": "int32", "versions": 
"11+", "default": "-1", "ignorable": true,

Review comment:
       I'm wondering if this should be ignorable. When this is set, the leader 
returns no data, so it relies crucially on the follower redirecting.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to