mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r462364719



##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> 
responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = 
Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
INVALID_PREFERRED_REPLICA_ID)
-                
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type 
found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if 
(partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = 
partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new 
ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : 
abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) 
abortedTransactionObj;
-                            long producerId = 
abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = 
abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new 
AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, 
abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), 
partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, 
(short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), 
struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() 
+ 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + 
responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> 
fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> 
fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - 
totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();

Review comment:
       Yea, I agree. @cmccabe had a suggestion about adding `Writable#close` 
which would achieve the same goal. I think this would be nice and clean things 
up a bit. I'll open a follow up PR for this

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##########
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
                          LinkedHashMap<TopicPartition, PartitionData<T>> 
responseData,
                          int throttleTimeMs,
                          int sessionId) {
-        this.error = error;
-        this.responseData = responseData;
-        this.throttleTimeMs = throttleTimeMs;
-        this.sessionId = sessionId;
+        this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        this.responseDataMap = responseData;
     }
 
-    public static FetchResponse<MemoryRecords> parse(Struct struct) {
-        LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> 
responseData = new LinkedHashMap<>();
-        for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-            Struct topicResponse = (Struct) topicResponseObj;
-            String topic = topicResponse.get(TOPIC_NAME);
-            for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionResponse = (Struct) partitionResponseObj;
-                Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-                int partition = partitionResponseHeader.get(PARTITION_ID);
-                Errors error = 
Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-                long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
-                long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
-                long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-                Optional<Integer> preferredReadReplica = Optional.of(
-                    partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
INVALID_PREFERRED_REPLICA_ID)
-                
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-                BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-                if (!(baseRecords instanceof MemoryRecords))
-                    throw new IllegalStateException("Unknown records type 
found: " + baseRecords.getClass());
-                MemoryRecords records = (MemoryRecords) baseRecords;
-
-                List<AbortedTransaction> abortedTransactions = null;
-                if 
(partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-                    Object[] abortedTransactionsArray = 
partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-                    if (abortedTransactionsArray != null) {
-                        abortedTransactions = new 
ArrayList<>(abortedTransactionsArray.length);
-                        for (Object abortedTransactionObj : 
abortedTransactionsArray) {
-                            Struct abortedTransactionStruct = (Struct) 
abortedTransactionObj;
-                            long producerId = 
abortedTransactionStruct.get(PRODUCER_ID);
-                            long firstOffset = 
abortedTransactionStruct.get(FIRST_OFFSET);
-                            abortedTransactions.add(new 
AbortedTransaction(producerId, firstOffset));
-                        }
-                    }
-                }
-
-                PartitionData<MemoryRecords> partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-                        logStartOffset, preferredReadReplica, 
abortedTransactions, records);
-                responseData.put(new TopicPartition(topic, partition), 
partitionData);
-            }
-        }
-        return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, 
(short) 0)), responseData,
-                struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), 
struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+    public FetchResponse(FetchResponseData fetchResponseData) {
+        this.data = fetchResponseData;
+        this.responseDataMap = toResponseDataMap(fetchResponseData);
     }
 
     @Override
     public Struct toStruct(short version) {
-        return toStruct(version, throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+        return data.toStruct(version);
     }
 
     @Override
-    protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
-        Struct responseHeaderStruct = responseHeader.toStruct();
-        Struct responseBodyStruct = toStruct(apiVersion);
-
-        // write the total size and the response header
-        ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() 
+ 4);
-        buffer.putInt(responseHeaderStruct.sizeOf() + 
responseBodyStruct.sizeOf());
-        responseHeaderStruct.writeTo(buffer);
+    public Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
+        // Generate the Sends for the response fields and records
+        ArrayDeque<Send> sends = new ArrayDeque<>();
+        ObjectSerializationCache cache = new ObjectSerializationCache();
+        int totalRecordSize = data.responses().stream()
+                .flatMap(fetchableTopicResponse -> 
fetchableTopicResponse.partitionResponses().stream())
+                .mapToInt(fetchablePartitionResponse -> 
fetchablePartitionResponse.recordSet().sizeInBytes())
+                .sum();
+        int totalMessageSize = data.size(cache, apiVersion);
+
+        RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - 
totalRecordSize, sends::add);
+        data.write(writer, cache, apiVersion);
+        writer.flush();
+
+        // Compute the total size of all the Sends and write it out along with 
the header in the first Send
+        ResponseHeaderData responseHeaderData = responseHeader.data();
+
+        int headerSize = responseHeaderData.size(cache, 
responseHeader.headerVersion());
+        int bodySize = (int) sends.stream().mapToLong(Send::size).sum();

Review comment:
       Do you mean something like `Math.toIntExact`?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##########
@@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, 
short apiVersion, Str
             case PRODUCE:
                 return new ProduceRequest(struct, apiVersion);
             case FETCH:
-                return new FetchRequest(struct, apiVersion);
+                return new FetchRequest(new FetchRequestData(struct, 
apiVersion), apiVersion);

Review comment:
       I just wanted to remove the Struct constructor of FetchRequest 
completely. Eventually, `RequestContext#parseRequest(ByteBuffer)` will stop 
using Structs and pass the message data classes to 
`AbstractRequest#parseRequest` (or similar).

##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read 
records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+    private final ByteBuffer buf;
+
+    public RecordsReader(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public byte readByte() {
+        return buf.get();
+    }
+
+    @Override
+    public short readShort() {
+        return buf.getShort();
+    }
+
+    @Override
+    public int readInt() {
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() {
+        return buf.getLong();
+    }
+
+    @Override
+    public double readDouble() {
+        return ByteUtils.readDouble(buf);
+    }
+
+    @Override
+    public void readArray(byte[] arr) {
+        buf.get(arr);
+    }
+
+    @Override
+    public int readUnsignedVarint() {
+        return ByteUtils.readUnsignedVarint(buf);
+    }
+
+    @Override
+    public ByteBuffer readByteBuffer(int length) {

Review comment:
       This is copied straight from ByteBufferAccessor and will probably go 
away in a follow-on PR. But either way, looking at it it seems it should always 
be in range since this is used by zero-copy byte fields in the message classes, 
e.g.
   
   ```
   int len = _reader.readInt();
   if (len > 0) {
     this.someZeroCopyField = _reader.readByteBuffer(len);
   }
   ```
   
   So generally it's probably safe. In the case of a corrupt message where the 
length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It 
probably would be nice to put a range check in ByteBufferAccessor so we can 
throw a more useful error.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Implementation of Readable which reads from a byte buffer and can read 
records as {@link MemoryRecords}
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsReader implements Readable {
+    private final ByteBuffer buf;
+
+    public RecordsReader(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public byte readByte() {
+        return buf.get();
+    }
+
+    @Override
+    public short readShort() {
+        return buf.getShort();
+    }
+
+    @Override
+    public int readInt() {
+        return buf.getInt();
+    }
+
+    @Override
+    public long readLong() {
+        return buf.getLong();
+    }
+
+    @Override
+    public double readDouble() {
+        return ByteUtils.readDouble(buf);
+    }
+
+    @Override
+    public void readArray(byte[] arr) {
+        buf.get(arr);
+    }
+
+    @Override
+    public int readUnsignedVarint() {
+        return ByteUtils.readUnsignedVarint(buf);
+    }
+
+    @Override
+    public ByteBuffer readByteBuffer(int length) {

Review comment:
       This is copied straight from ByteBufferAccessor and will probably go 
away in a follow-on PR. But either way, looking at it it seems it should always 
be in range since this is used by zero-copy byte fields in the message classes, 
e.g.
   
   ```java
   int len = _reader.readInt();
   if (len > 0) {
     this.someZeroCopyField = _reader.readByteBuffer(len);
   }
   ```
   
   So generally it's probably safe. In the case of a corrupt message where the 
length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It 
probably would be nice to put a range check in ByteBufferAccessor so we can 
throw a more useful error.

##########
File path: clients/src/main/resources/common/message/FetchResponse.json
##########
@@ -43,37 +43,39 @@
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", 
"ignorable": true,
       "about": "The duration in milliseconds for which the request was 
throttled due to a quota violation, or zero if the request did not violate any 
quota." },
-    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": 
false,
+    { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": 
true,
       "about": "The top level response error code." },
     { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", 
"ignorable": false,
       "about": "The fetch session ID, or 0 if this is not part of a fetch 
session." },
-    { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+",
+    { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": 
"0+",
       "about": "The response topics.", "fields": [
-      { "name": "Name", "type": "string", "versions": "0+", "entityType": 
"topicName",
+      { "name": "Topic", "type": "string", "versions": "0+", "entityType": 
"topicName",
         "about": "The topic name." },
-      { "name": "Partitions", "type": "[]FetchablePartitionResponse", 
"versions": "0+",
+      { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", 
"versions": "0+",
         "about": "The topic partitions.", "fields": [
-        { "name": "PartitionIndex", "type": "int32", "versions": "0+",
-          "about": "The partiiton index." },
-        { "name": "ErrorCode", "type": "int16", "versions": "0+",
-          "about": "The error code, or 0 if there was no fetch error." },
-        { "name": "HighWatermark", "type": "int64", "versions": "0+",
-          "about": "The current high water mark." },
-        { "name": "LastStableOffset", "type": "int64", "versions": "4+", 
"default": "-1", "ignorable": true,
-          "about": "The last stable offset (or LSO) of the partition. This is 
the last offset such that the state of all transactional records prior to this 
offset have been decided (ABORTED or COMMITTED)" },
-        { "name": "LogStartOffset", "type": "int64", "versions": "5+", 
"default": "-1", "ignorable": true,
-          "about": "The current log start offset." },
-        { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", 
"nullableVersions": "4+", "ignorable": false,
-          "about": "The aborted transactions.",  "fields": [
-          { "name": "ProducerId", "type": "int64", "versions": "4+", 
"entityType": "producerId",
-            "about": "The producer id associated with the aborted 
transaction." },
-          { "name": "FirstOffset", "type": "int64", "versions": "4+",
-            "about": "The first offset in the aborted transaction." }
+        { "name":  "PartitionHeader", "type": "PartitionHeader", "versions": 
"0+",
+          "fields":  [
+          { "name": "Partition", "type": "int32", "versions": "0+",
+            "about": "The partition index." },
+          { "name": "ErrorCode", "type": "int16", "versions": "0+",
+            "about": "The error code, or 0 if there was no fetch error." },
+          { "name": "HighWatermark", "type": "int64", "versions": "0+",
+            "about": "The current high water mark." },
+          { "name": "LastStableOffset", "type": "int64", "versions": "4+", 
"default": "-1", "ignorable": true,
+            "about": "The last stable offset (or LSO) of the partition. This 
is the last offset such that the state of all transactional records prior to 
this offset have been decided (ABORTED or COMMITTED)" },
+          { "name": "LogStartOffset", "type": "int64", "versions": "5+", 
"default": "-1", "ignorable": true,
+            "about": "The current log start offset." },
+          { "name": "AbortedTransactions", "type": "[]AbortedTransaction", 
"versions": "4+", "nullableVersions": "4+", "ignorable": true,
+            "about": "The aborted transactions.",  "fields": [
+            { "name": "ProducerId", "type": "int64", "versions": "4+", 
"entityType": "producerId",
+              "about": "The producer id associated with the aborted 
transaction." },
+            { "name": "FirstOffset", "type": "int64", "versions": "4+",
+              "about": "The first offset in the aborted transaction." }
+          ]},
+          { "name": "PreferredReadReplica", "type": "int32", "versions": 
"11+", "default": "-1", "ignorable": true,

Review comment:
       I see what you mean. If we have a bug that causes us to hit the 
preferred replica code for an older api version, we should fail to serialize 
the message rather than sending it to a client that doesn't understand follower 
redirection.
   
   Good catch.

##########
File path: clients/src/main/resources/common/message/FetchRequest.json
##########
@@ -55,35 +55,35 @@
       "about": "The minimum bytes to accumulate in the response." },
     { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": 
"0x7fffffff", "ignorable": true,
       "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
limit may not be honored." },
-    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
"0", "ignorable": false,
+    { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
"0", "ignorable": true,

Review comment:
       I changed this to make the JSON schema match what was previously in 
FetchRequest.java. During serialization, we would simply stick the isolation 
level in the Struct regardless of the api version:
   
   ```java
   struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id());
   ```
   
   So even if we were writing out a v3 FetchRequest, whatever value we put here 
would be ignored and not sent out. There were also some unit tests that 
utilized this behavior.
   
   Your assessment sounds correct though, so it probably doesn't matter whether 
it's ignorable or not. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to