hachikuji commented on a change in pull request #9401: URL: https://github.com/apache/kafka/pull/9401#discussion_r514326022
########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java ########## @@ -210,65 +142,42 @@ public String toString() { } } + /** + * We have to copy acks, timeout, transactionalId and partitionSizes from data since data maybe reset to eliminate + * the reference to ByteBuffer but those metadata are still useful. + */ private final short acks; private final int timeout; private final String transactionalId; - - private final Map<TopicPartition, Integer> partitionSizes; - + // visible for testing + final Map<TopicPartition, Integer> partitionSizes; + private boolean hasTransactionalRecords = false; + private boolean hasIdempotentRecords = false; // This is set to null by `clearPartitionRecords` to prevent unnecessary memory retention when a produce request is // put in the purgatory (due to client throttling, it can take a while before the response is sent). // Care should be taken in methods that use this field. - private volatile Map<TopicPartition, MemoryRecords> partitionRecords; - private boolean hasTransactionalRecords = false; - private boolean hasIdempotentRecords = false; - - private ProduceRequest(short version, short acks, int timeout, Map<TopicPartition, MemoryRecords> partitionRecords, String transactionalId) { - super(ApiKeys.PRODUCE, version); - this.acks = acks; - this.timeout = timeout; - - this.transactionalId = transactionalId; - this.partitionRecords = partitionRecords; - this.partitionSizes = createPartitionSizes(partitionRecords); + private volatile ProduceRequestData data; - for (MemoryRecords records : partitionRecords.values()) { - setFlags(records); - } - } - - private static Map<TopicPartition, Integer> createPartitionSizes(Map<TopicPartition, MemoryRecords> partitionRecords) { - Map<TopicPartition, Integer> result = new HashMap<>(partitionRecords.size()); - for (Map.Entry<TopicPartition, MemoryRecords> entry : partitionRecords.entrySet()) - result.put(entry.getKey(), entry.getValue().sizeInBytes()); - return result; - } - - public ProduceRequest(Struct struct, short version) { + public ProduceRequest(ProduceRequestData produceRequestData, short version) { super(ApiKeys.PRODUCE, version); - partitionRecords = new HashMap<>(); - for (Object topicDataObj : struct.getArray(TOPIC_DATA_KEY_NAME)) { - Struct topicData = (Struct) topicDataObj; - String topic = topicData.get(TOPIC_NAME); - for (Object partitionResponseObj : topicData.getArray(PARTITION_DATA_KEY_NAME)) { - Struct partitionResponse = (Struct) partitionResponseObj; - int partition = partitionResponse.get(PARTITION_ID); - MemoryRecords records = (MemoryRecords) partitionResponse.getRecords(RECORD_SET_KEY_NAME); - setFlags(records); - partitionRecords.put(new TopicPartition(topic, partition), records); - } - } - partitionSizes = createPartitionSizes(partitionRecords); - acks = struct.getShort(ACKS_KEY_NAME); - timeout = struct.getInt(TIMEOUT_KEY_NAME); - transactionalId = struct.getOrElse(NULLABLE_TRANSACTIONAL_ID, null); - } - - private void setFlags(MemoryRecords records) { - Iterator<MutableRecordBatch> iterator = records.batches().iterator(); - MutableRecordBatch entry = iterator.next(); - hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); - hasTransactionalRecords = hasTransactionalRecords || entry.isTransactional(); + this.data = produceRequestData; + this.data.topicData().forEach(topicProduceData -> topicProduceData.partitions() + .forEach(partitionProduceData -> { + MemoryRecords records = MemoryRecords.readableRecords(partitionProduceData.records()); + Iterator<MutableRecordBatch> iterator = records.batches().iterator(); + MutableRecordBatch entry = iterator.next(); + hasIdempotentRecords = hasIdempotentRecords || entry.hasProducerId(); Review comment: Would it make sense to move this to the builder where we are already doing a pass over the partitions? ########## File path: clients/src/main/resources/common/message/ProduceRequest.json ########## @@ -33,21 +33,21 @@ "validVersions": "0-8", "flexibleVersions": "none", "fields": [ - { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", + { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "entityType": "transactionalId", "about": "The transactional ID, or null if the producer is not transactional." }, { "name": "Acks", "type": "int16", "versions": "0+", "about": "The number of acknowledgments the producer requires the leader to have received before considering a request complete. Allowed values: 0 for no acknowledgments, 1 for only the leader and -1 for the full ISR." }, - { "name": "TimeoutMs", "type": "int32", "versions": "0+", + { "name": "Timeout", "type": "int32", "versions": "0+", Review comment: nit: I kind of liked the original name to make the unit clear. We're probably not consistent on this convention though. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java ########## @@ -17,179 +17,48 @@ package org.apache.kafka.common.requests; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.message.ProduceResponseData; +import org.apache.kafka.common.protocol.ByteBufferAccessor; import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; import org.apache.kafka.common.protocol.types.Struct; import org.apache.kafka.common.record.RecordBatch; -import org.apache.kafka.common.utils.CollectionUtils; import java.nio.ByteBuffer; -import java.util.ArrayList; +import java.util.AbstractMap; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; - -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT64; +import java.util.stream.Collectors; /** - * This wrapper supports both v0 and v1 of ProduceResponse. + * This wrapper supports both v0 and v8 of ProduceResponse. */ public class ProduceResponse extends AbstractResponse { - - private static final String RESPONSES_KEY_NAME = "responses"; - - // topic level field names - private static final String PARTITION_RESPONSES_KEY_NAME = "partition_responses"; - public static final long INVALID_OFFSET = -1L; + private final ProduceResponseData data; + private final Map<TopicPartition, PartitionResponse> responses; - /** Review comment: Kind of a pity to lose this. Can we move it to the class documentation? ########## File path: clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ########## @@ -218,6 +218,7 @@ private void checkSimpleRequestResponse(NetworkClient networkClient) { request.apiKey().responseHeaderVersion(PRODUCE.latestVersion())); Struct resp = new Struct(PRODUCE.responseSchema(PRODUCE.latestVersion())); resp.set("responses", new Object[0]); + resp.set(CommonFields.THROTTLE_TIME_MS, 100); Review comment: Wondering if we may as well rewrite this using `ProduceResponseData`. ########## File path: clients/src/main/resources/common/message/ProduceRequest.json ########## @@ -33,21 +33,21 @@ "validVersions": "0-8", "flexibleVersions": "none", "fields": [ - { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "0+", "entityType": "transactionalId", + { "name": "TransactionalId", "type": "string", "versions": "3+", "nullableVersions": "3+", "ignorable": true, "entityType": "transactionalId", Review comment: Hmm, not sure about making this ignorable. For transactional data, I think the broker would just fail if it cannot authorize the transactionalId. Also, should we set the default to null? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org