mumrah commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r462364719
########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -366,225 +255,128 @@ public FetchResponse(Errors error, LinkedHashMap<TopicPartition, PartitionData<T>> responseData, int throttleTimeMs, int sessionId) { - this.error = error; - this.responseData = responseData; - this.throttleTimeMs = throttleTimeMs; - this.sessionId = sessionId; + this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); + this.responseDataMap = responseData; } - public static FetchResponse<MemoryRecords> parse(Struct struct) { - LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>(); - for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME); - int partition = partitionResponseHeader.get(PARTITION_ID); - Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE)); - long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK); - long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET); - long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET); - Optional<Integer> preferredReadReplica = Optional.of( - partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID) - ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate()); - - BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME); - if (!(baseRecords instanceof MemoryRecords)) - throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass()); - MemoryRecords records = (MemoryRecords) baseRecords; - - List<AbortedTransaction> abortedTransactions = null; - if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) { - Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME); - if (abortedTransactionsArray != null) { - abortedTransactions = new ArrayList<>(abortedTransactionsArray.length); - for (Object abortedTransactionObj : abortedTransactionsArray) { - Struct abortedTransactionStruct = (Struct) abortedTransactionObj; - long producerId = abortedTransactionStruct.get(PRODUCER_ID); - long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET); - abortedTransactions.add(new AbortedTransaction(producerId, firstOffset)); - } - } - } - - PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset, - logStartOffset, preferredReadReplica, abortedTransactions, records); - responseData.put(new TopicPartition(topic, partition), partitionData); - } - } - return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData, - struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID)); + public FetchResponse(FetchResponseData fetchResponseData) { + this.data = fetchResponseData; + this.responseDataMap = toResponseDataMap(fetchResponseData); } @Override public Struct toStruct(short version) { - return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); + return data.toStruct(version); } @Override - protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { - Struct responseHeaderStruct = responseHeader.toStruct(); - Struct responseBodyStruct = toStruct(apiVersion); - - // write the total size and the response header - ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4); - buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf()); - responseHeaderStruct.writeTo(buffer); + public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { + // Generate the Sends for the response fields and records + ArrayDeque<Send> sends = new ArrayDeque<>(); + ObjectSerializationCache cache = new ObjectSerializationCache(); + int totalRecordSize = data.responses().stream() + .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream()) + .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes()) + .sum(); + int totalMessageSize = data.size(cache, apiVersion); + + RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add); + data.write(writer, cache, apiVersion); + writer.flush(); Review comment: Yea, I agree. @cmccabe had a suggestion about adding `Writable#close` which would achieve the same goal. I think this would be nice and clean things up a bit. I'll open a follow up PR for this ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -366,225 +255,128 @@ public FetchResponse(Errors error, LinkedHashMap<TopicPartition, PartitionData<T>> responseData, int throttleTimeMs, int sessionId) { - this.error = error; - this.responseData = responseData; - this.throttleTimeMs = throttleTimeMs; - this.sessionId = sessionId; + this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); + this.responseDataMap = responseData; } - public static FetchResponse<MemoryRecords> parse(Struct struct) { - LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>(); - for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME); - int partition = partitionResponseHeader.get(PARTITION_ID); - Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE)); - long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK); - long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET); - long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET); - Optional<Integer> preferredReadReplica = Optional.of( - partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID) - ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate()); - - BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME); - if (!(baseRecords instanceof MemoryRecords)) - throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass()); - MemoryRecords records = (MemoryRecords) baseRecords; - - List<AbortedTransaction> abortedTransactions = null; - if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) { - Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME); - if (abortedTransactionsArray != null) { - abortedTransactions = new ArrayList<>(abortedTransactionsArray.length); - for (Object abortedTransactionObj : abortedTransactionsArray) { - Struct abortedTransactionStruct = (Struct) abortedTransactionObj; - long producerId = abortedTransactionStruct.get(PRODUCER_ID); - long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET); - abortedTransactions.add(new AbortedTransaction(producerId, firstOffset)); - } - } - } - - PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset, - logStartOffset, preferredReadReplica, abortedTransactions, records); - responseData.put(new TopicPartition(topic, partition), partitionData); - } - } - return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData, - struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID)); + public FetchResponse(FetchResponseData fetchResponseData) { + this.data = fetchResponseData; + this.responseDataMap = toResponseDataMap(fetchResponseData); } @Override public Struct toStruct(short version) { - return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); + return data.toStruct(version); } @Override - protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { - Struct responseHeaderStruct = responseHeader.toStruct(); - Struct responseBodyStruct = toStruct(apiVersion); - - // write the total size and the response header - ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4); - buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf()); - responseHeaderStruct.writeTo(buffer); + public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { + // Generate the Sends for the response fields and records + ArrayDeque<Send> sends = new ArrayDeque<>(); + ObjectSerializationCache cache = new ObjectSerializationCache(); + int totalRecordSize = data.responses().stream() + .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream()) + .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes()) + .sum(); + int totalMessageSize = data.size(cache, apiVersion); + + RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add); + data.write(writer, cache, apiVersion); + writer.flush(); + + // Compute the total size of all the Sends and write it out along with the header in the first Send + ResponseHeaderData responseHeaderData = responseHeader.data(); + + int headerSize = responseHeaderData.size(cache, responseHeader.headerVersion()); + int bodySize = (int) sends.stream().mapToLong(Send::size).sum(); Review comment: Do you mean something like `Math.toIntExact`? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ########## @@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str case PRODUCE: return new ProduceRequest(struct, apiVersion); case FETCH: - return new FetchRequest(struct, apiVersion); + return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion); Review comment: I just wanted to remove the Struct constructor of FetchRequest completely. Eventually, `RequestContext#parseRequest(ByteBuffer)` will stop using Structs and pass the message data classes to `AbstractRequest#parseRequest` (or similar). ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.utils.ByteUtils; + +import java.nio.ByteBuffer; + +/** + * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords} + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsReader implements Readable { + private final ByteBuffer buf; + + public RecordsReader(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public byte readByte() { + return buf.get(); + } + + @Override + public short readShort() { + return buf.getShort(); + } + + @Override + public int readInt() { + return buf.getInt(); + } + + @Override + public long readLong() { + return buf.getLong(); + } + + @Override + public double readDouble() { + return ByteUtils.readDouble(buf); + } + + @Override + public void readArray(byte[] arr) { + buf.get(arr); + } + + @Override + public int readUnsignedVarint() { + return ByteUtils.readUnsignedVarint(buf); + } + + @Override + public ByteBuffer readByteBuffer(int length) { Review comment: This is copied straight from ByteBufferAccessor and will probably go away in a follow-on PR. But either way, looking at it it seems it should always be in range since this is used by zero-copy byte fields in the message classes, e.g. ``` int len = _reader.readInt(); if (len > 0) { this.someZeroCopyField = _reader.readByteBuffer(len); } ``` So generally it's probably safe. In the case of a corrupt message where the length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It probably would be nice to put a range check in ByteBufferAccessor so we can throw a more useful error. ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.utils.ByteUtils; + +import java.nio.ByteBuffer; + +/** + * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords} + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsReader implements Readable { + private final ByteBuffer buf; + + public RecordsReader(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public byte readByte() { + return buf.get(); + } + + @Override + public short readShort() { + return buf.getShort(); + } + + @Override + public int readInt() { + return buf.getInt(); + } + + @Override + public long readLong() { + return buf.getLong(); + } + + @Override + public double readDouble() { + return ByteUtils.readDouble(buf); + } + + @Override + public void readArray(byte[] arr) { + buf.get(arr); + } + + @Override + public int readUnsignedVarint() { + return ByteUtils.readUnsignedVarint(buf); + } + + @Override + public ByteBuffer readByteBuffer(int length) { Review comment: This is copied straight from ByteBufferAccessor and will probably go away in a follow-on PR. But either way, looking at it it seems it should always be in range since this is used by zero-copy byte fields in the message classes, e.g. ```java int len = _reader.readInt(); if (len > 0) { this.someZeroCopyField = _reader.readByteBuffer(len); } ``` So generally it's probably safe. In the case of a corrupt message where the length is wrong, ByteBuffer#limit will throw an error and parsing will fail. It probably would be nice to put a range check in ByteBufferAccessor so we can throw a more useful error. ########## File path: clients/src/main/resources/common/message/FetchResponse.json ########## @@ -43,37 +43,39 @@ "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false, + { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false, "about": "The fetch session ID, or 0 if this is not part of a fetch session." }, - { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+", + { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, - { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+", + { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+", "about": "The topic partitions.", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", - "about": "The partiiton index." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The error code, or 0 if there was no fetch error." }, - { "name": "HighWatermark", "type": "int64", "versions": "0+", - "about": "The current high water mark." }, - { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, - "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, - { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, - "about": "The current log start offset." }, - { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false, - "about": "The aborted transactions.", "fields": [ - { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", - "about": "The producer id associated with the aborted transaction." }, - { "name": "FirstOffset", "type": "int64", "versions": "4+", - "about": "The first offset in the aborted transaction." } + { "name": "PartitionHeader", "type": "PartitionHeader", "versions": "0+", + "fields": [ + { "name": "Partition", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no fetch error." }, + { "name": "HighWatermark", "type": "int64", "versions": "0+", + "about": "The current high water mark." }, + { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, + "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, + { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, + "about": "The current log start offset." }, + { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true, + "about": "The aborted transactions.", "fields": [ + { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", + "about": "The producer id associated with the aborted transaction." }, + { "name": "FirstOffset", "type": "int64", "versions": "4+", + "about": "The first offset in the aborted transaction." } + ]}, + { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true, Review comment: I see what you mean. If we have a bug that causes us to hit the preferred replica code for an older api version, we should fail to serialize the message rather than sending it to a client that doesn't understand follower redirection. Good catch. ########## File path: clients/src/main/resources/common/message/FetchRequest.json ########## @@ -55,35 +55,35 @@ "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, - { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false, + { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true, Review comment: I changed this to make the JSON schema match what was previously in FetchRequest.java. During serialization, we would simply stick the isolation level in the Struct regardless of the api version: ```java struct.setIfExists(ISOLATION_LEVEL, isolationLevel.id()); ``` So even if we were writing out a v3 FetchRequest, whatever value we put here would be ignored and not sent out. There were also some unit tests that utilized this behavior. Your assessment sounds correct though, so it probably doesn't matter whether it's ignorable or not. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org