ijuma commented on code in PR #13043: URL: https://github.com/apache/kafka/pull/13043#discussion_r1060101460
########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -686,7 +687,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized { producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None - val lastRecord = LastRecord(lastDataOffset, producerIdEntry.producerEpoch) + val lastRecord = new LastRecord( + if(lastDataOffset.isEmpty) OptionalLong.empty() else OptionalLong.of(lastDataOffset.get), Review Comment: Nit: space after `if`. ########## storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +/** + * The last written record for a given producer. The last data offset may be undefined + * if the only log entry for a producer is a transaction marker. + */ +public final class LastRecord { + public final OptionalLong lastDataOffset; + public final short producerEpoch; + + public LastRecord(OptionalLong lastDataOffset, short producerEpoch) { + this.lastDataOffset = lastDataOffset; + this.producerEpoch = producerEpoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LastRecord that = (LastRecord) o; + + if (producerEpoch != that.producerEpoch) return false; + return Objects.equals(lastDataOffset, that.lastDataOffset); Review Comment: `lastDataOffset` is never `null`. Maybe we can enforce that in the constructor for clarity and then avoid the null checks elsewhere. Also, we can use `&&` instead of `if/return`. ########## core/src/test/scala/unit/kafka/log/LogSegmentTest.scala: ########## @@ -354,9 +356,10 @@ class LogSegmentTest { // recover again, but this time assuming the transaction from pid2 began on a previous segment stateManager = newProducerStateManager() - stateManager.loadProducerEntry(new ProducerStateEntry(pid2, - mutable.Queue[BatchMetadata](BatchMetadata(10, 10L, 5, RecordBatch.NO_TIMESTAMP)), producerEpoch, - 0, RecordBatch.NO_TIMESTAMP, Some(75L))) + val batchMetadata = new util.ArrayList[BatchMetadata]() Review Comment: Is there a reason why we need to extract this versus passing inline like it was done before? ########## core/src/main/scala/kafka/log/ProducerStateManager.scala: ########## @@ -406,13 +101,13 @@ object ProducerStateManager { val offsetDelta = producerEntryStruct.getInt(OffsetDeltaField) val coordinatorEpoch = producerEntryStruct.getInt(CoordinatorEpochField) val currentTxnFirstOffset = producerEntryStruct.getLong(CurrentTxnFirstOffsetField) - val lastAppendedDataBatches = mutable.Queue.empty[BatchMetadata] + val lastAppendedDataBatches = new java.util.ArrayList[BatchMetadata] if (offset >= 0) - lastAppendedDataBatches += BatchMetadata(seq, offset, offsetDelta, timestamp) + lastAppendedDataBatches.add(new BatchMetadata(seq, offset, offsetDelta, timestamp)) - val newEntry = new ProducerStateEntry(producerId, lastAppendedDataBatches, producerEpoch, - coordinatorEpoch, timestamp, if (currentTxnFirstOffset >= 0) Some(currentTxnFirstOffset) else None) - newEntry + val currentTxnFirstOffsetValue:OptionalLong = if (currentTxnFirstOffset >= 0) OptionalLong.of(currentTxnFirstOffset) else OptionalLong.empty() Review Comment: Nit: remove `:OptionalLong`. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset); + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + + "which is smaller than the last seen epoch %d", producerId, offset, topicPartition, producerEpoch, updatedEntry.producerEpoch()); + + if (origin == AppendOrigin.REPLICATION) { + log.warn(message); + } else { + // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the + // producer send response callback to differentiate from the former fatal exception, + // letting client abort the ongoing transaction and retry. + throw new InvalidProducerEpochException(message); + } + } + } + + private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { + if (producerEpoch != updatedEntry.producerEpoch()) { + if (appendFirstSeq != 0) { + if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); Review Comment: Same question regarding `String.format` over String concat. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset); + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + + "which is smaller than the last seen epoch %d", producerId, offset, topicPartition, producerEpoch, updatedEntry.producerEpoch()); + + if (origin == AppendOrigin.REPLICATION) { + log.warn(message); + } else { + // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the + // producer send response callback to differentiate from the former fatal exception, + // letting client abort the ongoing transaction and retry. + throw new InvalidProducerEpochException(message); + } + } + } + + private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { + if (producerEpoch != updatedEntry.producerEpoch()) { + if (appendFirstSeq != 0) { + if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); + } + } + } else { + int currentLastSeq; + if (!updatedEntry.isEmpty()) + currentLastSeq = updatedEntry.lastSeq(); + else if (producerEpoch == currentEntry.producerEpoch()) + currentLastSeq = currentEntry.lastSeq(); + else + currentLastSeq = RecordBatch.NO_SEQUENCE; + + // If there is no current producer epoch (possibly because all producer records have been deleted due to + // retention or the DeleteRecords API) accept writes with any sequence number + if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { + throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); + } + } + } + + private boolean inSequence(int lastSeq, int nextSeq) { + return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE); + } + + public Optional<CompletedTxn> append(RecordBatch batch, Optional<LogOffsetMetadata> firstOffsetMetadataOpt) { + if (batch.isControlBatch()) { + Iterator<Record> recordIterator = batch.iterator(); + if (recordIterator.hasNext()) { + Record record = recordIterator.next(); + EndTransactionMarker endTxnMarker = EndTransactionMarker.deserialize(record); + return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), batch.baseOffset(), record.timestamp()); + } else { + // An empty control batch means the entire transaction has been cleaned from the log, so no need to append + return Optional.empty(); + } + } else { + LogOffsetMetadata firstOffsetMetadata = firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset())); + appendDataBatch(batch.producerEpoch(), batch.baseSequence(), batch.lastSequence(), batch.maxTimestamp(), + firstOffsetMetadata, batch.lastOffset(), batch.isTransactional()); + return Optional.empty(); + } + } + + public void appendDataBatch(short epoch, + int firstSeq, + int lastSeq, + long lastTimestamp, + LogOffsetMetadata firstOffsetMetadata, + long lastOffset, + boolean isTransactional) { + long firstOffset = firstOffsetMetadata.messageOffset; + maybeValidateDataBatch(epoch, firstSeq, firstOffset); + updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp); Review Comment: Even though this was present in the previous code, I think it would be good to extract this unsafe cast into a separate method and do an overflow check. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { + public static final int NUM_BATCHES_TO_RETAIN = 5; + public final long producerId; + private final List<BatchMetadata> batchMetadata; + private short producerEpoch; + public int coordinatorEpoch; + public long lastTimestamp; + public OptionalLong currentTxnFirstOffset; + + public ProducerStateEntry(long producerId) { + this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + } Review Comment: Why did we add this overload? Looks like the previous implementation didn't have it. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { + public static final int NUM_BATCHES_TO_RETAIN = 5; + public final long producerId; + private final List<BatchMetadata> batchMetadata; + private short producerEpoch; + public int coordinatorEpoch; + public long lastTimestamp; + public OptionalLong currentTxnFirstOffset; + + public ProducerStateEntry(long producerId) { + this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + } + + public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + this(producerId, new ArrayList<>(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); + } + + public ProducerStateEntry(long producerId, List<BatchMetadata> batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + this.producerId = producerId; + this.batchMetadata = batchMetadata; + this.producerEpoch = producerEpoch; + this.coordinatorEpoch = coordinatorEpoch; + this.lastTimestamp = lastTimestamp; + this.currentTxnFirstOffset = currentTxnFirstOffset; + } + + public int firstSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(0).firstSeq(); + } + + + public long firstDataOffset() { + return isEmpty() ? -1L : batchMetadata.get(0).firstOffset(); + } + + public int lastSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(batchMetadata.size() - 1).lastSeq; + } + + public long lastDataOffset() { + return isEmpty() ? -1L : batchMetadata.get(batchMetadata.size() - 1).lastOffset; + } + + public int lastOffsetDelta() { + return isEmpty() ? 0 : batchMetadata.get(batchMetadata.size() - 1).offsetDelta; Review Comment: You can add a private `lastOrFail` method to simplify these `batchMetadata.get(batchMetadata.size() - 1)` calls. For symmetry, you can also add a `firstOrFail` method. ########## storage/src/main/java/org/apache/kafka/server/log/internals/LastRecord.java: ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +/** + * The last written record for a given producer. The last data offset may be undefined + * if the only log entry for a producer is a transaction marker. + */ +public final class LastRecord { + public final OptionalLong lastDataOffset; + public final short producerEpoch; + + public LastRecord(OptionalLong lastDataOffset, short producerEpoch) { + this.lastDataOffset = lastDataOffset; + this.producerEpoch = producerEpoch; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + LastRecord that = (LastRecord) o; + + if (producerEpoch != that.producerEpoch) return false; + return Objects.equals(lastDataOffset, that.lastDataOffset); + } + + @Override + public int hashCode() { + int result = lastDataOffset != null ? lastDataOffset.hashCode() : 0; + result = 31 * result + (int) producerEpoch; Review Comment: Is this cast actually needed? ########## storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +public final class TxnMetadata { + public final long producerId; + public final LogOffsetMetadata firstOffset; + public OptionalLong lastOffset; + + public TxnMetadata(long producerId, + LogOffsetMetadata firstOffset, + OptionalLong lastOffset) { + this.producerId = producerId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } + public TxnMetadata(long producerId, long firstOffset) { + this(producerId, new LogOffsetMetadata(firstOffset)); + } + + public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { + this(producerId, firstOffset, OptionalLong.empty()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TxnMetadata that = (TxnMetadata) o; + + if (producerId != that.producerId) return false; + if (!Objects.equals(firstOffset, that.firstOffset)) return false; Review Comment: `firstOffset` should never be null and hence we don't need to use Objects.equals. Similarly for the hashCode implementation. We can enforce that in the constructor for clarity perhaps. ########## storage/src/main/java/org/apache/kafka/server/log/internals/TxnMetadata.java: ########## @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import java.util.Objects; +import java.util.OptionalLong; + +public final class TxnMetadata { + public final long producerId; + public final LogOffsetMetadata firstOffset; + public OptionalLong lastOffset; + + public TxnMetadata(long producerId, + LogOffsetMetadata firstOffset, + OptionalLong lastOffset) { + this.producerId = producerId; + this.firstOffset = firstOffset; + this.lastOffset = lastOffset; + } + public TxnMetadata(long producerId, long firstOffset) { + this(producerId, new LogOffsetMetadata(firstOffset)); + } + + public TxnMetadata(long producerId, LogOffsetMetadata firstOffset) { + this(producerId, firstOffset, OptionalLong.empty()); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + TxnMetadata that = (TxnMetadata) o; + + if (producerId != that.producerId) return false; + if (!Objects.equals(firstOffset, that.firstOffset)) return false; + return Objects.equals(lastOffset, that.lastOffset); Review Comment: We should not use this in `equals` and `hashCode` if it's mutable. Looking at the usage of `TxnMetadata`, I wonder if it needs `equals` and `hashCode` at all. Maybe we should not override these methods since the semantics are a bit unsafe. ########## core/src/main/scala/kafka/log/UnifiedLog.scala: ########## @@ -686,7 +687,9 @@ class UnifiedLog(@volatile var logStartOffset: Long, private[log] def lastRecordsOfActiveProducers: Map[Long, LastRecord] = lock synchronized { producerStateManager.activeProducers.map { case (producerId, producerIdEntry) => val lastDataOffset = if (producerIdEntry.lastDataOffset >= 0 ) Some(producerIdEntry.lastDataOffset) else None Review Comment: It seems that we can create the `Optional` here since we immediately discard this `Option` in a couple of lines below. ########## storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.DefaultRecordBatch; + +public class BatchMetadata { + + public final int lastSeq; + public final long lastOffset; + public final int offsetDelta; + public final long timestamp; + + public BatchMetadata( + int lastSeq, + long lastOffset, + int offsetDelta, + long timestamp) { + this.lastSeq = lastSeq; + this.lastOffset = lastOffset; + this.offsetDelta = offsetDelta; + this.timestamp = timestamp; + } + + public int firstSeq() { + return DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta); + } + + public long firstOffset() { + return lastOffset - offsetDelta; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BatchMetadata that = (BatchMetadata) o; + + if (lastSeq != that.lastSeq) return false; + if (lastOffset != that.lastOffset) return false; + if (offsetDelta != that.offsetDelta) return false; + return timestamp == that.timestamp; + } + + @Override + public int hashCode() { + int result = lastSeq; + result = 31 * result + (int) (lastOffset ^ (lastOffset >>> 32)); Review Comment: Use `Long.hashCode` instead. Same a couple of lines below. ########## storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.DefaultRecordBatch; + +public class BatchMetadata { + + public final int lastSeq; + public final long lastOffset; + public final int offsetDelta; + public final long timestamp; + + public BatchMetadata( + int lastSeq, + long lastOffset, + int offsetDelta, + long timestamp) { + this.lastSeq = lastSeq; + this.lastOffset = lastOffset; + this.offsetDelta = offsetDelta; + this.timestamp = timestamp; + } + + public int firstSeq() { + return DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta); + } + + public long firstOffset() { + return lastOffset - offsetDelta; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BatchMetadata that = (BatchMetadata) o; + + if (lastSeq != that.lastSeq) return false; + if (lastOffset != that.lastOffset) return false; + if (offsetDelta != that.offsetDelta) return false; Review Comment: Nit: should these be combined with `&&` versus multiple `if/return`? ########## storage/src/main/java/org/apache/kafka/server/log/internals/BatchMetadata.java: ########## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.DefaultRecordBatch; + +public class BatchMetadata { + + public final int lastSeq; + public final long lastOffset; + public final int offsetDelta; + public final long timestamp; + + public BatchMetadata( + int lastSeq, + long lastOffset, + int offsetDelta, + long timestamp) { + this.lastSeq = lastSeq; + this.lastOffset = lastOffset; + this.offsetDelta = offsetDelta; + this.timestamp = timestamp; + } + + public int firstSeq() { + return DefaultRecordBatch.decrementSequence(lastSeq, offsetDelta); + } + + public long firstOffset() { + return lastOffset - offsetDelta; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + + BatchMetadata that = (BatchMetadata) o; + + if (lastSeq != that.lastSeq) return false; + if (lastOffset != that.lastOffset) return false; + if (offsetDelta != that.offsetDelta) return false; + return timestamp == that.timestamp; + } + + @Override + public int hashCode() { + int result = lastSeq; + result = 31 * result + (int) (lastOffset ^ (lastOffset >>> 32)); + result = 31 * result + offsetDelta; + result = 31 * result + (int) (timestamp ^ (timestamp >>> 32)); + return result; + } + + @Override + public String toString() { + return "BatchMetadata(" + Review Comment: `firstSeq` and `firstOffset` are missing when compared to the previous implementation. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; Review Comment: Let's list `public` fields first. Does this need to be `public` btw? ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** Review Comment: Nit: can we add something like `Creates new instance with the provided parameters.`? ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { + public static final int NUM_BATCHES_TO_RETAIN = 5; + public final long producerId; + private final List<BatchMetadata> batchMetadata; + private short producerEpoch; + public int coordinatorEpoch; + public long lastTimestamp; + public OptionalLong currentTxnFirstOffset; + + public ProducerStateEntry(long producerId) { + this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + } + + public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + this(producerId, new ArrayList<>(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); + } + + public ProducerStateEntry(long producerId, List<BatchMetadata> batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + this.producerId = producerId; + this.batchMetadata = batchMetadata; + this.producerEpoch = producerEpoch; + this.coordinatorEpoch = coordinatorEpoch; + this.lastTimestamp = lastTimestamp; + this.currentTxnFirstOffset = currentTxnFirstOffset; + } + + public int firstSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(0).firstSeq(); + } + + + public long firstDataOffset() { + return isEmpty() ? -1L : batchMetadata.get(0).firstOffset(); + } + + public int lastSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(batchMetadata.size() - 1).lastSeq; + } + + public long lastDataOffset() { + return isEmpty() ? -1L : batchMetadata.get(batchMetadata.size() - 1).lastOffset; + } + + public int lastOffsetDelta() { + return isEmpty() ? 0 : batchMetadata.get(batchMetadata.size() - 1).offsetDelta; + } + + public boolean isEmpty() { + return batchMetadata.isEmpty(); + } + + public void addBatch(short producerEpoch, int lastSeq, long lastOffset, int offsetDelta, long timestamp) { + maybeUpdateProducerEpoch(producerEpoch); + addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp)); + this.lastTimestamp = timestamp; + } + + public boolean maybeUpdateProducerEpoch(short producerEpoch) { + if (this.producerEpoch != producerEpoch) { + batchMetadata.clear(); + this.producerEpoch = producerEpoch; + return true; + } else { + return false; + } + } + + private void addBatchMetadata(BatchMetadata batch) { + if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.remove(0); Review Comment: `remove(0)` is expensive for `ArrayList`. This is likely the reason why `Queue` had been used previously. I suggest using `ArrayDeque` as the implementation and `java.util.Queue` or `java.util.Deque` as the interface. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset); + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + Review Comment: Any reason to prefer `String.format` over plain old String concat? The latter is generally more efficient. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset); + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + + "which is smaller than the last seen epoch %d", producerId, offset, topicPartition, producerEpoch, updatedEntry.producerEpoch()); + + if (origin == AppendOrigin.REPLICATION) { + log.warn(message); + } else { + // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the + // producer send response callback to differentiate from the former fatal exception, + // letting client abort the ongoing transaction and retry. + throw new InvalidProducerEpochException(message); + } + } + } + + private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { + if (producerEpoch != updatedEntry.producerEpoch()) { + if (appendFirstSeq != 0) { + if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); + } + } + } else { + int currentLastSeq; + if (!updatedEntry.isEmpty()) + currentLastSeq = updatedEntry.lastSeq(); + else if (producerEpoch == currentEntry.producerEpoch()) + currentLastSeq = currentEntry.lastSeq(); + else + currentLastSeq = RecordBatch.NO_SEQUENCE; + + // If there is no current producer epoch (possibly because all producer records have been deleted due to + // retention or the DeleteRecords API) accept writes with any sequence number + if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { + throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); + } + } + } + + private boolean inSequence(int lastSeq, int nextSeq) { + return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE); + } + + public Optional<CompletedTxn> append(RecordBatch batch, Optional<LogOffsetMetadata> firstOffsetMetadataOpt) { + if (batch.isControlBatch()) { + Iterator<Record> recordIterator = batch.iterator(); + if (recordIterator.hasNext()) { + Record record = recordIterator.next(); + EndTransactionMarker endTxnMarker = EndTransactionMarker.deserialize(record); + return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), batch.baseOffset(), record.timestamp()); + } else { + // An empty control batch means the entire transaction has been cleaned from the log, so no need to append + return Optional.empty(); + } + } else { + LogOffsetMetadata firstOffsetMetadata = firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset())); + appendDataBatch(batch.producerEpoch(), batch.baseSequence(), batch.lastSequence(), batch.maxTimestamp(), + firstOffsetMetadata, batch.lastOffset(), batch.isTransactional()); + return Optional.empty(); + } + } + + public void appendDataBatch(short epoch, + int firstSeq, + int lastSeq, + long lastTimestamp, + LogOffsetMetadata firstOffsetMetadata, + long lastOffset, + boolean isTransactional) { + long firstOffset = firstOffsetMetadata.messageOffset; + maybeValidateDataBatch(epoch, firstSeq, firstOffset); + updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp); + + OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset; + if (currentTxnFirstOffset.isPresent()) { + if (!isTransactional) Review Comment: Should this be moved into the previous `if` block? ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset); + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + + "which is smaller than the last seen epoch %d", producerId, offset, topicPartition, producerEpoch, updatedEntry.producerEpoch()); + + if (origin == AppendOrigin.REPLICATION) { + log.warn(message); + } else { + // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the + // producer send response callback to differentiate from the former fatal exception, + // letting client abort the ongoing transaction and retry. + throw new InvalidProducerEpochException(message); + } + } + } + + private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { + if (producerEpoch != updatedEntry.producerEpoch()) { + if (appendFirstSeq != 0) { + if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); + } + } + } else { + int currentLastSeq; + if (!updatedEntry.isEmpty()) + currentLastSeq = updatedEntry.lastSeq(); + else if (producerEpoch == currentEntry.producerEpoch()) + currentLastSeq = currentEntry.lastSeq(); + else + currentLastSeq = RecordBatch.NO_SEQUENCE; + + // If there is no current producer epoch (possibly because all producer records have been deleted due to + // retention or the DeleteRecords API) accept writes with any sequence number + if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { + throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); + } + } + } + + private boolean inSequence(int lastSeq, int nextSeq) { + return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE); + } + + public Optional<CompletedTxn> append(RecordBatch batch, Optional<LogOffsetMetadata> firstOffsetMetadataOpt) { + if (batch.isControlBatch()) { + Iterator<Record> recordIterator = batch.iterator(); + if (recordIterator.hasNext()) { + Record record = recordIterator.next(); + EndTransactionMarker endTxnMarker = EndTransactionMarker.deserialize(record); + return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), batch.baseOffset(), record.timestamp()); + } else { + // An empty control batch means the entire transaction has been cleaned from the log, so no need to append + return Optional.empty(); + } + } else { + LogOffsetMetadata firstOffsetMetadata = firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset())); + appendDataBatch(batch.producerEpoch(), batch.baseSequence(), batch.lastSequence(), batch.maxTimestamp(), + firstOffsetMetadata, batch.lastOffset(), batch.isTransactional()); + return Optional.empty(); + } + } + + public void appendDataBatch(short epoch, + int firstSeq, + int lastSeq, + long lastTimestamp, + LogOffsetMetadata firstOffsetMetadata, + long lastOffset, + boolean isTransactional) { + long firstOffset = firstOffsetMetadata.messageOffset; + maybeValidateDataBatch(epoch, firstSeq, firstOffset); + updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp); + + OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset; + if (currentTxnFirstOffset.isPresent()) { + if (!isTransactional) + // Received a non-transactional message while a transaction is active + throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " + + "offset " + firstOffsetMetadata + " in partition " + topicPartition); + } else { + if (isTransactional) { + // Began a new transaction + updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset); + transactions.add(new TxnMetadata(producerId, firstOffsetMetadata)); + } + } + } + + private void checkCoordinatorEpoch(EndTransactionMarker endTxnMarker, long offset) { + if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch()) { + if (origin == AppendOrigin.REPLICATION) { + log.info("Detected invalid coordinator epoch for producerId " + producerId + " at " + Review Comment: We should use log placeholders here so that we avoid the string concat if the log is not enabled. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset); + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + + "which is smaller than the last seen epoch %d", producerId, offset, topicPartition, producerEpoch, updatedEntry.producerEpoch()); + + if (origin == AppendOrigin.REPLICATION) { + log.warn(message); + } else { + // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the + // producer send response callback to differentiate from the former fatal exception, + // letting client abort the ongoing transaction and retry. + throw new InvalidProducerEpochException(message); + } + } + } + + private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { + if (producerEpoch != updatedEntry.producerEpoch()) { + if (appendFirstSeq != 0) { + if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); + } + } + } else { + int currentLastSeq; + if (!updatedEntry.isEmpty()) + currentLastSeq = updatedEntry.lastSeq(); + else if (producerEpoch == currentEntry.producerEpoch()) + currentLastSeq = currentEntry.lastSeq(); + else + currentLastSeq = RecordBatch.NO_SEQUENCE; + + // If there is no current producer epoch (possibly because all producer records have been deleted due to + // retention or the DeleteRecords API) accept writes with any sequence number + if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { + throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); + } + } + } + + private boolean inSequence(int lastSeq, int nextSeq) { + return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE); + } + + public Optional<CompletedTxn> append(RecordBatch batch, Optional<LogOffsetMetadata> firstOffsetMetadataOpt) { + if (batch.isControlBatch()) { + Iterator<Record> recordIterator = batch.iterator(); + if (recordIterator.hasNext()) { + Record record = recordIterator.next(); + EndTransactionMarker endTxnMarker = EndTransactionMarker.deserialize(record); + return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), batch.baseOffset(), record.timestamp()); + } else { + // An empty control batch means the entire transaction has been cleaned from the log, so no need to append + return Optional.empty(); + } + } else { + LogOffsetMetadata firstOffsetMetadata = firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset())); + appendDataBatch(batch.producerEpoch(), batch.baseSequence(), batch.lastSequence(), batch.maxTimestamp(), + firstOffsetMetadata, batch.lastOffset(), batch.isTransactional()); + return Optional.empty(); + } + } + + public void appendDataBatch(short epoch, + int firstSeq, + int lastSeq, + long lastTimestamp, + LogOffsetMetadata firstOffsetMetadata, + long lastOffset, + boolean isTransactional) { + long firstOffset = firstOffsetMetadata.messageOffset; + maybeValidateDataBatch(epoch, firstSeq, firstOffset); + updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp); + + OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset; + if (currentTxnFirstOffset.isPresent()) { + if (!isTransactional) + // Received a non-transactional message while a transaction is active + throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " + + "offset " + firstOffsetMetadata + " in partition " + topicPartition); + } else { + if (isTransactional) { + // Began a new transaction + updatedEntry.currentTxnFirstOffset = OptionalLong.of(firstOffset); + transactions.add(new TxnMetadata(producerId, firstOffsetMetadata)); + } + } + } + + private void checkCoordinatorEpoch(EndTransactionMarker endTxnMarker, long offset) { + if (updatedEntry.coordinatorEpoch > endTxnMarker.coordinatorEpoch()) { + if (origin == AppendOrigin.REPLICATION) { + log.info("Detected invalid coordinator epoch for producerId " + producerId + " at " + + "offset " + offset + " in partition $topicPartition: " + endTxnMarker.coordinatorEpoch() + + " is older than previously known coordinator epoch " + updatedEntry.coordinatorEpoch); + } else { + throw new TransactionCoordinatorFencedException("Invalid coordinator epoch for producerId " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + endTxnMarker.coordinatorEpoch() + + " (zombie), " + updatedEntry.coordinatorEpoch + " (current)"); + } + } + } + + public Optional<CompletedTxn> appendEndTxnMarker( + EndTransactionMarker endTxnMarker, + short producerEpoch, + long offset, + long timestamp) { + checkProducerEpoch(producerEpoch, offset); + checkCoordinatorEpoch(endTxnMarker, offset); + + // Only emit the `CompletedTxn` for non-empty transactions. A transaction marker + // without any associated data will not have any impact on the last stable offset + // and would not need to be reflected in the transaction index. + Optional<CompletedTxn> completedTxn = updatedEntry.currentTxnFirstOffset.isPresent() ? + Optional.of(new CompletedTxn(producerId, updatedEntry.currentTxnFirstOffset.getAsLong(), offset, + endTxnMarker.controlType() == ControlRecordType.ABORT)) + : Optional.empty(); + + updatedEntry.maybeUpdateProducerEpoch(producerEpoch); + updatedEntry.currentTxnFirstOffset = OptionalLong.empty(); + updatedEntry.coordinatorEpoch = endTxnMarker.coordinatorEpoch(); + updatedEntry.lastTimestamp = timestamp; + + return completedTxn; + } + + public ProducerStateEntry toEntry() { + return updatedEntry; + } + + public List<TxnMetadata> startedTransactions() { + return Collections.unmodifiableList(transactions); + } + + @Override + public String toString() { + return "ProducerAppendInfo(" + Review Comment: This implementation is different from the previous one in many ways. ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { + public static final int NUM_BATCHES_TO_RETAIN = 5; + public final long producerId; + private final List<BatchMetadata> batchMetadata; + private short producerEpoch; + public int coordinatorEpoch; + public long lastTimestamp; + public OptionalLong currentTxnFirstOffset; + + public ProducerStateEntry(long producerId) { + this(producerId, new ArrayList<>(), RecordBatch.NO_PRODUCER_EPOCH, -1, RecordBatch.NO_TIMESTAMP, OptionalLong.empty()); + } + + public ProducerStateEntry(long producerId, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + this(producerId, new ArrayList<>(), producerEpoch, coordinatorEpoch, lastTimestamp, currentTxnFirstOffset); + } + + public ProducerStateEntry(long producerId, List<BatchMetadata> batchMetadata, short producerEpoch, int coordinatorEpoch, long lastTimestamp, OptionalLong currentTxnFirstOffset) { + this.producerId = producerId; + this.batchMetadata = batchMetadata; + this.producerEpoch = producerEpoch; + this.coordinatorEpoch = coordinatorEpoch; + this.lastTimestamp = lastTimestamp; + this.currentTxnFirstOffset = currentTxnFirstOffset; + } + + public int firstSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(0).firstSeq(); + } + + + public long firstDataOffset() { + return isEmpty() ? -1L : batchMetadata.get(0).firstOffset(); + } + + public int lastSeq() { + return isEmpty() ? RecordBatch.NO_SEQUENCE : batchMetadata.get(batchMetadata.size() - 1).lastSeq; + } + + public long lastDataOffset() { + return isEmpty() ? -1L : batchMetadata.get(batchMetadata.size() - 1).lastOffset; + } + + public int lastOffsetDelta() { + return isEmpty() ? 0 : batchMetadata.get(batchMetadata.size() - 1).offsetDelta; + } + + public boolean isEmpty() { + return batchMetadata.isEmpty(); + } + + public void addBatch(short producerEpoch, int lastSeq, long lastOffset, int offsetDelta, long timestamp) { + maybeUpdateProducerEpoch(producerEpoch); + addBatchMetadata(new BatchMetadata(lastSeq, lastOffset, offsetDelta, timestamp)); + this.lastTimestamp = timestamp; + } + + public boolean maybeUpdateProducerEpoch(short producerEpoch) { + if (this.producerEpoch != producerEpoch) { + batchMetadata.clear(); + this.producerEpoch = producerEpoch; + return true; + } else { + return false; + } + } + + private void addBatchMetadata(BatchMetadata batch) { + if (batchMetadata.size() == ProducerStateEntry.NUM_BATCHES_TO_RETAIN) batchMetadata.remove(0); + batchMetadata.add(batch); + } + + public void update(ProducerStateEntry nextEntry) { + maybeUpdateProducerEpoch(nextEntry.producerEpoch); + while (!nextEntry.batchMetadata.isEmpty()) addBatchMetadata(nextEntry.batchMetadata.remove(0)); + this.coordinatorEpoch = nextEntry.coordinatorEpoch; + this.currentTxnFirstOffset = nextEntry.currentTxnFirstOffset; + this.lastTimestamp = nextEntry.lastTimestamp; + } + + public Optional<BatchMetadata> findDuplicateBatch(RecordBatch batch) { + if (batch.producerEpoch() != producerEpoch) return Optional.empty(); + else return batchWithSequenceRange(batch.baseSequence(), batch.lastSequence()); + } + + // Return the batch metadata of the cached batch having the exact sequence range, if any. + Optional<BatchMetadata> batchWithSequenceRange(int firstSeq, int lastSeq) { + Stream<BatchMetadata> duplicate = batchMetadata.stream().filter(metadata -> firstSeq == metadata.firstSeq() && lastSeq == metadata.lastSeq); + return duplicate.findFirst(); + } + + public List<BatchMetadata> batchMetadata() { + return Collections.unmodifiableList(batchMetadata); + } + + public short producerEpoch() { + return producerEpoch; + } + + @Override + public String toString() { Review Comment: This implementation differs from the original one. Is there a reason for that? ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerStateEntry.java: ########## @@ -0,0 +1,143 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.record.RecordBatch; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; +import java.util.stream.Stream; + +/** + * The batchMetadata is ordered such that the batch with the lowest sequence is at the head of the queue while the + * batch with the highest sequence is at the tail of the queue. We will retain at most {@link ProducerStateEntry#NUM_BATCHES_TO_RETAIN} + * elements in the queue. When the queue is at capacity, we remove the first element to make space for the incoming batch. + */ +public class ProducerStateEntry { + public static final int NUM_BATCHES_TO_RETAIN = 5; + public final long producerId; + private final List<BatchMetadata> batchMetadata; + private short producerEpoch; + public int coordinatorEpoch; + public long lastTimestamp; + public OptionalLong currentTxnFirstOffset; Review Comment: Do we need these mutable fields to be public? Can we instead expose a mutating method that updates them all together? ########## storage/src/main/java/org/apache/kafka/server/log/internals/ProducerAppendInfo.java: ########## @@ -0,0 +1,238 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.internals; + +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.InvalidProducerEpochException; +import org.apache.kafka.common.errors.InvalidTxnStateException; +import org.apache.kafka.common.errors.OutOfOrderSequenceException; +import org.apache.kafka.common.errors.TransactionCoordinatorFencedException; +import org.apache.kafka.common.record.ControlRecordType; +import org.apache.kafka.common.record.EndTransactionMarker; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.RecordBatch; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Optional; +import java.util.OptionalLong; + +/** + * This class is used to validate the records appended by a given producer before they are written to the log. + * It is initialized with the producer's state after the last successful append, and transitively validates the + * sequence numbers and epochs of each new record. Additionally, this class accumulates transaction metadata + * as the incoming records are validated. + */ +public class ProducerAppendInfo { + private static final Logger log = LoggerFactory.getLogger(ProducerAppendInfo.class); + private final TopicPartition topicPartition; + public final long producerId; + private final ProducerStateEntry currentEntry; + private final AppendOrigin origin; + + private final List<TxnMetadata> transactions = new ArrayList<>(); + private final ProducerStateEntry updatedEntry; + + /** + * @param topicPartition topic partition + * @param producerId The id of the producer appending to the log + * @param currentEntry The current entry associated with the producer id which contains metadata for a fixed number of + * the most recent appends made by the producer. Validation of the first incoming append will + * be made against the latest append in the current entry. New appends will replace older appends + * in the current entry so that the space overhead is constant. + * @param origin Indicates the origin of the append which implies the extent of validation. For example, offset + * commits, which originate from the group coordinator, do not have sequence numbers and therefore + * only producer epoch validation is done. Appends which come through replication are not validated + * (we assume the validation has already been done) and appends from clients require full validation. + */ + public ProducerAppendInfo(TopicPartition topicPartition, + long producerId, + ProducerStateEntry currentEntry, + AppendOrigin origin) { + this.topicPartition = topicPartition; + this.producerId = producerId; + this.currentEntry = currentEntry; + this.origin = origin; + + updatedEntry = new ProducerStateEntry(producerId, currentEntry.producerEpoch(), + currentEntry.coordinatorEpoch, + currentEntry.lastTimestamp, + currentEntry.currentTxnFirstOffset); + } + + private void maybeValidateDataBatch(short producerEpoch, int firstSeq, long offset) { + checkProducerEpoch(producerEpoch, offset); + if (origin == AppendOrigin.CLIENT) { + checkSequence(producerEpoch, firstSeq, offset); + } + } + + private void checkProducerEpoch(short producerEpoch, long offset) { + if (producerEpoch < updatedEntry.producerEpoch()) { + String message = String.format("Epoch of producer %d at offset %d in %s is %d, " + + "which is smaller than the last seen epoch %d", producerId, offset, topicPartition, producerEpoch, updatedEntry.producerEpoch()); + + if (origin == AppendOrigin.REPLICATION) { + log.warn(message); + } else { + // Starting from 2.7, we replaced ProducerFenced error with InvalidProducerEpoch in the + // producer send response callback to differentiate from the former fatal exception, + // letting client abort the ongoing transaction and retry. + throw new InvalidProducerEpochException(message); + } + } + } + + private void checkSequence(short producerEpoch, int appendFirstSeq, long offset) { + if (producerEpoch != updatedEntry.producerEpoch()) { + if (appendFirstSeq != 0) { + if (updatedEntry.producerEpoch() != RecordBatch.NO_PRODUCER_EPOCH) { + throw new OutOfOrderSequenceException("Invalid sequence number for new epoch of producer " + producerId + + "at offset " + offset + " in partition " + topicPartition + ": " + producerEpoch + " (request epoch), " + + appendFirstSeq + " (seq. number), " + updatedEntry.producerEpoch() + " (current producer epoch)"); + } + } + } else { + int currentLastSeq; + if (!updatedEntry.isEmpty()) + currentLastSeq = updatedEntry.lastSeq(); + else if (producerEpoch == currentEntry.producerEpoch()) + currentLastSeq = currentEntry.lastSeq(); + else + currentLastSeq = RecordBatch.NO_SEQUENCE; + + // If there is no current producer epoch (possibly because all producer records have been deleted due to + // retention or the DeleteRecords API) accept writes with any sequence number + if (!(currentEntry.producerEpoch() == RecordBatch.NO_PRODUCER_EPOCH || inSequence(currentLastSeq, appendFirstSeq))) { + throw new OutOfOrderSequenceException("Out of order sequence number for producer " + producerId + " at " + + "offset " + offset + " in partition " + topicPartition + ": " + appendFirstSeq + + " (incoming seq. number), " + currentLastSeq + " (current end sequence number)"); + } + } + } + + private boolean inSequence(int lastSeq, int nextSeq) { + return nextSeq == lastSeq + 1L || (nextSeq == 0 && lastSeq == Integer.MAX_VALUE); + } + + public Optional<CompletedTxn> append(RecordBatch batch, Optional<LogOffsetMetadata> firstOffsetMetadataOpt) { + if (batch.isControlBatch()) { + Iterator<Record> recordIterator = batch.iterator(); + if (recordIterator.hasNext()) { + Record record = recordIterator.next(); + EndTransactionMarker endTxnMarker = EndTransactionMarker.deserialize(record); + return appendEndTxnMarker(endTxnMarker, batch.producerEpoch(), batch.baseOffset(), record.timestamp()); + } else { + // An empty control batch means the entire transaction has been cleaned from the log, so no need to append + return Optional.empty(); + } + } else { + LogOffsetMetadata firstOffsetMetadata = firstOffsetMetadataOpt.orElse(new LogOffsetMetadata(batch.baseOffset())); + appendDataBatch(batch.producerEpoch(), batch.baseSequence(), batch.lastSequence(), batch.maxTimestamp(), + firstOffsetMetadata, batch.lastOffset(), batch.isTransactional()); + return Optional.empty(); + } + } + + public void appendDataBatch(short epoch, + int firstSeq, + int lastSeq, + long lastTimestamp, + LogOffsetMetadata firstOffsetMetadata, + long lastOffset, + boolean isTransactional) { + long firstOffset = firstOffsetMetadata.messageOffset; + maybeValidateDataBatch(epoch, firstSeq, firstOffset); + updatedEntry.addBatch(epoch, lastSeq, lastOffset, (int) (lastOffset - firstOffset), lastTimestamp); + + OptionalLong currentTxnFirstOffset = updatedEntry.currentTxnFirstOffset; + if (currentTxnFirstOffset.isPresent()) { + if (!isTransactional) + // Received a non-transactional message while a transaction is active + throw new InvalidTxnStateException("Expected transactional write from producer " + producerId + " at " + + "offset " + firstOffsetMetadata + " in partition " + topicPartition); + } else { + if (isTransactional) { Review Comment: Should this be moved into the previous else block? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org