hachikuji commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r461173162
########## File path: clients/src/main/resources/common/message/FetchRequest.json ########## @@ -55,35 +55,35 @@ "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fffffff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, - { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false, + { "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true, Review comment: I guess the implicit expectation is that if the protocol does not support the `read_committed` isolation level, then it wouldn't have transactional data anyway, so reverting to `read_uncommitted` is safe. Can't find a fault with that. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ########## @@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str case PRODUCE: return new ProduceRequest(struct, apiVersion); case FETCH: - return new FetchRequest(struct, apiVersion); + return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion); Review comment: nit: any reason not to stick with the same constructor convention as the other requests? ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.network.ByteBufferSend; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.DataOutput; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer + * of data from a record-set's file channel to the eventual socket channel. + * + * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array + * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written + * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes, + * another Send is passed to the consumer which wraps the underlying record-set's transfer logic. + * + * For example, + * + * <pre> + * recordsWritable.writeInt(10); + * recordsWritable.writeRecords(records1); + * recordsWritable.writeInt(20); + * recordsWritable.writeRecords(records2); + * recordsWritable.writeInt(30); + * recordsWritable.flush(); + * </pre> + * + * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any + * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is + * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}. + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsWriter implements Writable { + private final String dest; + private final Consumer<Send> sendConsumer; + private final ByteBuffer buffer; + private int mark; + + public RecordsWriter(String dest, int totalSize, Consumer<Send> sendConsumer) { Review comment: Could we rename `totalSize` so that it is clear that it does not cover the record sizes. Maybe `totalOverheadSize` or `totalNonRecordSize` or something like that. ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ########## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.network.ByteBufferSend; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.DataOutput; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer + * of data from a record-set's file channel to the eventual socket channel. + * + * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array + * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written + * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes, + * another Send is passed to the consumer which wraps the underlying record-set's transfer logic. + * + * For example, + * + * <pre> + * recordsWritable.writeInt(10); + * recordsWritable.writeRecords(records1); + * recordsWritable.writeInt(20); + * recordsWritable.writeRecords(records2); + * recordsWritable.writeInt(30); + * recordsWritable.flush(); + * </pre> + * + * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any + * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is + * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}. + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsWriter implements Writable { Review comment: nit: wonder if this should be `RecordsWritable` for consistency with `Writable`. ########## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsReader.java ########## @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.utils.ByteUtils; + +import java.nio.ByteBuffer; + +/** + * Implementation of Readable which reads from a byte buffer and can read records as {@link MemoryRecords} + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsReader implements Readable { + private final ByteBuffer buf; + + public RecordsReader(ByteBuffer buf) { + this.buf = buf; + } + + @Override + public byte readByte() { + return buf.get(); + } + + @Override + public short readShort() { + return buf.getShort(); + } + + @Override + public int readInt() { + return buf.getInt(); + } + + @Override + public long readLong() { + return buf.getLong(); + } + + @Override + public double readDouble() { + return ByteUtils.readDouble(buf); + } + + @Override + public void readArray(byte[] arr) { + buf.get(arr); + } + + @Override + public int readUnsignedVarint() { + return ByteUtils.readUnsignedVarint(buf); + } + + @Override + public ByteBuffer readByteBuffer(int length) { Review comment: More of a side question, but is this length guaranteed to be less than the buffer size? Wondering if it is worth adding range checking. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -366,225 +255,128 @@ public FetchResponse(Errors error, LinkedHashMap<TopicPartition, PartitionData<T>> responseData, int throttleTimeMs, int sessionId) { - this.error = error; - this.responseData = responseData; - this.throttleTimeMs = throttleTimeMs; - this.sessionId = sessionId; + this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); + this.responseDataMap = responseData; } - public static FetchResponse<MemoryRecords> parse(Struct struct) { - LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>(); - for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME); - int partition = partitionResponseHeader.get(PARTITION_ID); - Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE)); - long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK); - long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET); - long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET); - Optional<Integer> preferredReadReplica = Optional.of( - partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID) - ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate()); - - BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME); - if (!(baseRecords instanceof MemoryRecords)) - throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass()); - MemoryRecords records = (MemoryRecords) baseRecords; - - List<AbortedTransaction> abortedTransactions = null; - if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) { - Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME); - if (abortedTransactionsArray != null) { - abortedTransactions = new ArrayList<>(abortedTransactionsArray.length); - for (Object abortedTransactionObj : abortedTransactionsArray) { - Struct abortedTransactionStruct = (Struct) abortedTransactionObj; - long producerId = abortedTransactionStruct.get(PRODUCER_ID); - long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET); - abortedTransactions.add(new AbortedTransaction(producerId, firstOffset)); - } - } - } - - PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset, - logStartOffset, preferredReadReplica, abortedTransactions, records); - responseData.put(new TopicPartition(topic, partition), partitionData); - } - } - return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData, - struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID)); + public FetchResponse(FetchResponseData fetchResponseData) { + this.data = fetchResponseData; + this.responseDataMap = toResponseDataMap(fetchResponseData); } @Override public Struct toStruct(short version) { - return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); + return data.toStruct(version); } @Override - protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { - Struct responseHeaderStruct = responseHeader.toStruct(); - Struct responseBodyStruct = toStruct(apiVersion); - - // write the total size and the response header - ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4); - buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf()); - responseHeaderStruct.writeTo(buffer); + public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { + // Generate the Sends for the response fields and records + ArrayDeque<Send> sends = new ArrayDeque<>(); + ObjectSerializationCache cache = new ObjectSerializationCache(); + int totalRecordSize = data.responses().stream() + .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream()) + .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes()) + .sum(); + int totalMessageSize = data.size(cache, apiVersion); + + RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add); + data.write(writer, cache, apiVersion); + writer.flush(); Review comment: nit: not a big deal, but I feel like calling `flush` should really be the responsibility of `write`. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ########## @@ -366,225 +255,128 @@ public FetchResponse(Errors error, LinkedHashMap<TopicPartition, PartitionData<T>> responseData, int throttleTimeMs, int sessionId) { - this.error = error; - this.responseData = responseData; - this.throttleTimeMs = throttleTimeMs; - this.sessionId = sessionId; + this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); + this.responseDataMap = responseData; } - public static FetchResponse<MemoryRecords> parse(Struct struct) { - LinkedHashMap<TopicPartition, PartitionData<MemoryRecords>> responseData = new LinkedHashMap<>(); - for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { - Struct topicResponse = (Struct) topicResponseObj; - String topic = topicResponse.get(TOPIC_NAME); - for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME); - int partition = partitionResponseHeader.get(PARTITION_ID); - Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE)); - long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK); - long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET); - long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET); - Optional<Integer> preferredReadReplica = Optional.of( - partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID) - ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate()); - - BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME); - if (!(baseRecords instanceof MemoryRecords)) - throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass()); - MemoryRecords records = (MemoryRecords) baseRecords; - - List<AbortedTransaction> abortedTransactions = null; - if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) { - Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME); - if (abortedTransactionsArray != null) { - abortedTransactions = new ArrayList<>(abortedTransactionsArray.length); - for (Object abortedTransactionObj : abortedTransactionsArray) { - Struct abortedTransactionStruct = (Struct) abortedTransactionObj; - long producerId = abortedTransactionStruct.get(PRODUCER_ID); - long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET); - abortedTransactions.add(new AbortedTransaction(producerId, firstOffset)); - } - } - } - - PartitionData<MemoryRecords> partitionData = new PartitionData<>(error, highWatermark, lastStableOffset, - logStartOffset, preferredReadReplica, abortedTransactions, records); - responseData.put(new TopicPartition(topic, partition), partitionData); - } - } - return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData, - struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID)); + public FetchResponse(FetchResponseData fetchResponseData) { + this.data = fetchResponseData; + this.responseDataMap = toResponseDataMap(fetchResponseData); } @Override public Struct toStruct(short version) { - return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); + return data.toStruct(version); } @Override - protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { - Struct responseHeaderStruct = responseHeader.toStruct(); - Struct responseBodyStruct = toStruct(apiVersion); - - // write the total size and the response header - ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4); - buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf()); - responseHeaderStruct.writeTo(buffer); + public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { + // Generate the Sends for the response fields and records + ArrayDeque<Send> sends = new ArrayDeque<>(); + ObjectSerializationCache cache = new ObjectSerializationCache(); + int totalRecordSize = data.responses().stream() + .flatMap(fetchableTopicResponse -> fetchableTopicResponse.partitionResponses().stream()) + .mapToInt(fetchablePartitionResponse -> fetchablePartitionResponse.recordSet().sizeInBytes()) + .sum(); + int totalMessageSize = data.size(cache, apiVersion); + + RecordsWriter writer = new RecordsWriter(dest, totalMessageSize - totalRecordSize, sends::add); + data.write(writer, cache, apiVersion); + writer.flush(); + + // Compute the total size of all the Sends and write it out along with the header in the first Send + ResponseHeaderData responseHeaderData = responseHeader.data(); + + int headerSize = responseHeaderData.size(cache, responseHeader.headerVersion()); + int bodySize = (int) sends.stream().mapToLong(Send::size).sum(); Review comment: Instead of the cast, could we add a validation check? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequestResponse.java ########## @@ -16,5 +16,16 @@ */ package org.apache.kafka.common.requests; +import org.apache.kafka.common.protocol.ApiMessage; + public interface AbstractRequestResponse { + /** + * Return the auto-generated `Message` instance if this request/response relies on one for + * serialization/deserialization. If this class has not yet been updated to rely on the auto-generated protocol + * classes, return `null`. + * @return + */ + default ApiMessage data() { Review comment: @mumrah Do we need this for this PR or can we leave this for #7409? ########## File path: clients/src/main/resources/common/message/FetchResponse.json ########## @@ -43,37 +43,39 @@ "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false, + { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false, "about": "The fetch session ID, or 0 if this is not part of a fetch session." }, - { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+", + { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, - { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+", + { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+", "about": "The topic partitions.", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", - "about": "The partiiton index." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The error code, or 0 if there was no fetch error." }, - { "name": "HighWatermark", "type": "int64", "versions": "0+", - "about": "The current high water mark." }, - { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, - "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, - { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, - "about": "The current log start offset." }, - { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false, - "about": "The aborted transactions.", "fields": [ - { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", - "about": "The producer id associated with the aborted transaction." }, - { "name": "FirstOffset", "type": "int64", "versions": "4+", - "about": "The first offset in the aborted transaction." } + { "name": "PartitionHeader", "type": "PartitionHeader", "versions": "0+", + "fields": [ + { "name": "Partition", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no fetch error." }, + { "name": "HighWatermark", "type": "int64", "versions": "0+", + "about": "The current high water mark." }, + { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, + "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, + { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, + "about": "The current log start offset." }, + { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true, + "about": "The aborted transactions.", "fields": [ + { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", + "about": "The producer id associated with the aborted transaction." }, + { "name": "FirstOffset", "type": "int64", "versions": "4+", + "about": "The first offset in the aborted transaction." } + ]}, + { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true, Review comment: I'm wondering if this should be ignorable. When this is set, the leader returns no data, so it relies crucially on the follower redirection. ########## File path: clients/src/main/resources/common/message/FetchResponse.json ########## @@ -43,37 +43,39 @@ "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "1+", "ignorable": true, "about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." }, - { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": false, + { "name": "ErrorCode", "type": "int16", "versions": "7+", "ignorable": true, "about": "The top level response error code." }, { "name": "SessionId", "type": "int32", "versions": "7+", "default": "0", "ignorable": false, "about": "The fetch session ID, or 0 if this is not part of a fetch session." }, - { "name": "Topics", "type": "[]FetchableTopicResponse", "versions": "0+", + { "name": "Responses", "type": "[]FetchableTopicResponse", "versions": "0+", "about": "The response topics.", "fields": [ - { "name": "Name", "type": "string", "versions": "0+", "entityType": "topicName", + { "name": "Topic", "type": "string", "versions": "0+", "entityType": "topicName", "about": "The topic name." }, - { "name": "Partitions", "type": "[]FetchablePartitionResponse", "versions": "0+", + { "name": "PartitionResponses", "type": "[]FetchablePartitionResponse", "versions": "0+", "about": "The topic partitions.", "fields": [ - { "name": "PartitionIndex", "type": "int32", "versions": "0+", - "about": "The partiiton index." }, - { "name": "ErrorCode", "type": "int16", "versions": "0+", - "about": "The error code, or 0 if there was no fetch error." }, - { "name": "HighWatermark", "type": "int64", "versions": "0+", - "about": "The current high water mark." }, - { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, - "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, - { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, - "about": "The current log start offset." }, - { "name": "Aborted", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": false, - "about": "The aborted transactions.", "fields": [ - { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", - "about": "The producer id associated with the aborted transaction." }, - { "name": "FirstOffset", "type": "int64", "versions": "4+", - "about": "The first offset in the aborted transaction." } + { "name": "PartitionHeader", "type": "PartitionHeader", "versions": "0+", + "fields": [ + { "name": "Partition", "type": "int32", "versions": "0+", + "about": "The partition index." }, + { "name": "ErrorCode", "type": "int16", "versions": "0+", + "about": "The error code, or 0 if there was no fetch error." }, + { "name": "HighWatermark", "type": "int64", "versions": "0+", + "about": "The current high water mark." }, + { "name": "LastStableOffset", "type": "int64", "versions": "4+", "default": "-1", "ignorable": true, + "about": "The last stable offset (or LSO) of the partition. This is the last offset such that the state of all transactional records prior to this offset have been decided (ABORTED or COMMITTED)" }, + { "name": "LogStartOffset", "type": "int64", "versions": "5+", "default": "-1", "ignorable": true, + "about": "The current log start offset." }, + { "name": "AbortedTransactions", "type": "[]AbortedTransaction", "versions": "4+", "nullableVersions": "4+", "ignorable": true, + "about": "The aborted transactions.", "fields": [ + { "name": "ProducerId", "type": "int64", "versions": "4+", "entityType": "producerId", + "about": "The producer id associated with the aborted transaction." }, + { "name": "FirstOffset", "type": "int64", "versions": "4+", + "about": "The first offset in the aborted transaction." } + ]}, + { "name": "PreferredReadReplica", "type": "int32", "versions": "11+", "default": "-1", "ignorable": true, Review comment: I'm wondering if this should be ignorable. When this is set, the leader returns no data, so it relies crucially on the follower redirecting. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org