jsancio commented on a change in pull request #11529: URL: https://github.com/apache/kafka/pull/11529#discussion_r757955172
########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new RecordsSnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + }); + writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); + return writer; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new RecordsSnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + }); + writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); + return writer; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshot.snapshotId(); + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new RecordsSnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + }); + writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); + return writer; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshot.snapshotId(); + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { + return snapshot.snapshotId().offset - 1; + } + + /** + * Returns the epoch of the last log offset which is represented in the snapshot. + */ + public int lastContainedLogEpoch() { + return snapshot.snapshotId().epoch; + } + + /** + * Returns true if the snapshot has been frozen, otherwise false is returned. + * + * Modification to the snapshot are not allowed once it is frozen. + */ + public boolean isFrozen() { + return snapshot.isFrozen(); + } + + /** + * Appends a list of values to the snapshot. + * + * The list of record passed are guaranteed to get written together. + * + * @param records the list of records to append to the snapshot + * @throws IllegalStateException if append is called when isFrozen is true + */ + public void append(List<T> records) { + if (snapshot.isFrozen()) { + String message = String.format( + "Append not supported. Snapshot is already frozen: id = '%s'.", + snapshot.snapshotId() + ); + + throw new IllegalStateException(message); + } + + accumulator.append(snapshot.snapshotId().epoch, records); + + if (accumulator.needsDrain(time.milliseconds())) { + appendBatches(accumulator.drain()); + } + } + + /** + * Freezes the snapshot by flushing all pending writes and marking it as immutable. + * + * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot + */ + public void freeze() { + finalizeSnapshotWithFooter(); + appendBatches(accumulator.drain()); + snapshot.freeze(); + accumulator.close(); + } + + /** + * Closes the snapshot writer. + * + * If close is called without first calling freeze the snapshot is aborted. + */ + public void close() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/SnapshotWriter.java ########## @@ -183,52 +65,25 @@ public boolean isFrozen() { * @param records the list of records to append to the snapshot * @throws IllegalStateException if append is called when isFrozen is true */ - public void append(List<T> records) { - if (snapshot.isFrozen()) { - String message = String.format( - "Append not supported. Snapshot is already frozen: id = '%s'.", - snapshot.snapshotId() - ); - - throw new IllegalStateException(message); - } - - accumulator.append(snapshot.snapshotId().epoch, records); - - if (accumulator.needsDrain(time.milliseconds())) { - appendBatches(accumulator.drain()); - } - } + void append(List<T> records); /** * Freezes the snapshot by flushing all pending writes and marking it as immutable. * * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot */ - public void freeze() { - finalizeSnapshotWithFooter(); - appendBatches(accumulator.drain()); - snapshot.freeze(); - accumulator.close(); - } + void freeze(); /** * Closes the snapshot writer. * * If close is called without first calling freeze the snapshot is aborted. */ - public void close() { - snapshot.close(); - accumulator.close(); - } + void close(); + + /** + * Initialize the snapshot with header + */ + void initializeSnapshotWithHeader(); Review comment: Let's remove this method `initializeSnapshotWithHeader` from the interface. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new RecordsSnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + }); + writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); + return writer; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshot.snapshotId(); + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { + return snapshot.snapshotId().offset - 1; + } + + /** + * Returns the epoch of the last log offset which is represented in the snapshot. + */ + public int lastContainedLogEpoch() { + return snapshot.snapshotId().epoch; + } + + /** + * Returns true if the snapshot has been frozen, otherwise false is returned. + * + * Modification to the snapshot are not allowed once it is frozen. + */ + public boolean isFrozen() { + return snapshot.isFrozen(); + } + + /** + * Appends a list of values to the snapshot. + * + * The list of record passed are guaranteed to get written together. + * + * @param records the list of records to append to the snapshot + * @throws IllegalStateException if append is called when isFrozen is true + */ + public void append(List<T> records) { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new RecordsSnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + }); + writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); + return writer; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshot.snapshotId(); + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { + return snapshot.snapshotId().offset - 1; + } + + /** + * Returns the epoch of the last log offset which is represented in the snapshot. + */ + public int lastContainedLogEpoch() { + return snapshot.snapshotId().epoch; + } + + /** + * Returns true if the snapshot has been frozen, otherwise false is returned. + * + * Modification to the snapshot are not allowed once it is frozen. + */ + public boolean isFrozen() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { Review comment: Let's keep this as a private method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new RecordsSnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + }); + writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); + return writer; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshot.snapshotId(); + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { + return snapshot.snapshotId().offset - 1; + } + + /** + * Returns the epoch of the last log offset which is represented in the snapshot. + */ + public int lastContainedLogEpoch() { + return snapshot.snapshotId().epoch; + } + + /** + * Returns true if the snapshot has been frozen, otherwise false is returned. + * + * Modification to the snapshot are not allowed once it is frozen. + */ + public boolean isFrozen() { + return snapshot.isFrozen(); + } + + /** + * Appends a list of values to the snapshot. + * + * The list of record passed are guaranteed to get written together. + * + * @param records the list of records to append to the snapshot + * @throws IllegalStateException if append is called when isFrozen is true + */ + public void append(List<T> records) { + if (snapshot.isFrozen()) { + String message = String.format( + "Append not supported. Snapshot is already frozen: id = '%s'.", + snapshot.snapshotId() + ); + + throw new IllegalStateException(message); + } + + accumulator.append(snapshot.snapshotId().epoch, records); + + if (accumulator.needsDrain(time.milliseconds())) { + appendBatches(accumulator.drain()); + } + } + + /** + * Freezes the snapshot by flushing all pending writes and marking it as immutable. + * + * Also adds a {@link SnapshotFooterRecord} to the end of the snapshot + */ + public void freeze() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ Review comment: Please remove the documentation from all of the methods that override methods already documented in `SnapshotWriter`. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.RecordsIterator; + +public final class RecordsSnapshotReader<T> implements SnapshotReader<T> { + private final OffsetAndEpoch snapshotId; + private final RecordsIterator<T> iterator; + + private Optional<Batch<T>> nextBatch = Optional.empty(); + private OptionalLong lastContainedLogTimestamp = OptionalLong.empty(); + + private RecordsSnapshotReader( + OffsetAndEpoch snapshotId, + RecordsIterator<T> iterator + ) { + this.snapshotId = snapshotId; + this.iterator = iterator; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new RecordsSnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + }); + writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); + return writer; Review comment: When you make `initializeSnapshotWithHeader` private, you may need to slightly change this implementation. E.g.: ```java return supplier.get().map(snapshot -> { RecordsSnapshotWriter<T> writer = new RecordsSnapshotWriter<>( snapshot, maxBatchSize, memoryPool, snapshotTime, lastContainedLogTimestamp, CompressionType.NONE, serde); writer.initializeSnapshotWithHeader(); return writer; }); ``` ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotWriter.java ########## @@ -0,0 +1,221 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import org.apache.kafka.common.memory.MemoryPool; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.utils.Time; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.BatchAccumulator; +import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch; +import org.apache.kafka.common.message.SnapshotHeaderRecord; +import org.apache.kafka.common.message.SnapshotFooterRecord; +import org.apache.kafka.common.record.ControlRecordUtils; + +import java.util.Optional; +import java.util.List; +import java.util.function.Supplier; + +final public class RecordsSnapshotWriter<T> implements SnapshotWriter<T> { + final private RawSnapshotWriter snapshot; + final private BatchAccumulator<T> accumulator; + final private Time time; + final private long lastContainedLogTimestamp; + + private RecordsSnapshotWriter( + RawSnapshotWriter snapshot, + int maxBatchSize, + MemoryPool memoryPool, + Time time, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + this.snapshot = snapshot; + this.time = time; + this.lastContainedLogTimestamp = lastContainedLogTimestamp; + + this.accumulator = new BatchAccumulator<>( + snapshot.snapshotId().epoch, + 0, + Integer.MAX_VALUE, + maxBatchSize, + memoryPool, + time, + compressionType, + serde + ); + } + + /** + * Adds a {@link SnapshotHeaderRecord} to snapshot + * + * @throws IllegalStateException if the snapshot is not empty + */ + public void initializeSnapshotWithHeader() { + if (snapshot.sizeInBytes() != 0) { + String message = String.format( + "Initializing writer with a non-empty snapshot: id = '%s'.", + snapshot.snapshotId() + ); + throw new IllegalStateException(message); + } + + SnapshotHeaderRecord headerRecord = new SnapshotHeaderRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_HEADER_HIGHEST_VERSION) + .setLastContainedLogTimestamp(lastContainedLogTimestamp); + accumulator.appendSnapshotHeaderMessage(headerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Adds a {@link SnapshotFooterRecord} to the snapshot + * + * No more records should be appended to the snapshot after calling this method + */ + private void finalizeSnapshotWithFooter() { + SnapshotFooterRecord footerRecord = new SnapshotFooterRecord() + .setVersion(ControlRecordUtils.SNAPSHOT_FOOTER_HIGHEST_VERSION); + accumulator.appendSnapshotFooterMessage(footerRecord, time.milliseconds()); + accumulator.forceDrain(); + } + + /** + * Create an instance of this class and initialize + * the underlying snapshot with {@link SnapshotHeaderRecord} + * + * @param snapshot a lambda to create the low level snapshot writer + * @param maxBatchSize the maximum size in byte for a batch + * @param memoryPool the memory pool for buffer allocation + * @param time the clock implementation + * @param lastContainedLogTimestamp The append time of the highest record contained in this snapshot + * @param compressionType the compression algorithm to use + * @param serde the record serialization and deserialization implementation + * @return {@link Optional}{@link RecordsSnapshotWriter} + */ + public static <T> Optional<SnapshotWriter<T>> createWithHeader( + Supplier<Optional<RawSnapshotWriter>> supplier, + int maxBatchSize, + MemoryPool memoryPool, + Time snapshotTime, + long lastContainedLogTimestamp, + CompressionType compressionType, + RecordSerde<T> serde + ) { + Optional<SnapshotWriter<T>> writer = supplier.get().map(snapshot -> { + return new RecordsSnapshotWriter<T>( + snapshot, + maxBatchSize, + memoryPool, + snapshotTime, + lastContainedLogTimestamp, + CompressionType.NONE, + serde); + }); + writer.ifPresent(SnapshotWriter::initializeSnapshotWithHeader); + return writer; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshot.snapshotId(); + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { + return snapshot.snapshotId().offset - 1; + } + + /** + * Returns the epoch of the last log offset which is represented in the snapshot. + */ + public int lastContainedLogEpoch() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.RecordsIterator; + +public final class RecordsSnapshotReader<T> implements SnapshotReader<T> { + private final OffsetAndEpoch snapshotId; + private final RecordsIterator<T> iterator; + + private Optional<Batch<T>> nextBatch = Optional.empty(); + private OptionalLong lastContainedLogTimestamp = OptionalLong.empty(); + + private RecordsSnapshotReader( + OffsetAndEpoch snapshotId, + RecordsIterator<T> iterator + ) { + this.snapshotId = snapshotId; + this.iterator = iterator; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshotId; + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { + return snapshotId.offset - 1; + } + + /** + * Returns the epoch of the last log offset which is represented in the snapshot. + */ + public int lastContainedLogEpoch() { + return snapshotId.epoch; + } + + /** + * Returns the timestamp of the last log offset which is represented in the snapshot. + */ + public long lastContainedLogTimestamp() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.RecordsIterator; + +public final class RecordsSnapshotReader<T> implements SnapshotReader<T> { + private final OffsetAndEpoch snapshotId; + private final RecordsIterator<T> iterator; + + private Optional<Batch<T>> nextBatch = Optional.empty(); + private OptionalLong lastContainedLogTimestamp = OptionalLong.empty(); + + private RecordsSnapshotReader( + OffsetAndEpoch snapshotId, + RecordsIterator<T> iterator + ) { + this.snapshotId = snapshotId; + this.iterator = iterator; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshotId; + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { + return snapshotId.offset - 1; + } + + /** + * Returns the epoch of the last log offset which is represented in the snapshot. + */ + public int lastContainedLogEpoch() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.RecordsIterator; + +public final class RecordsSnapshotReader<T> implements SnapshotReader<T> { + private final OffsetAndEpoch snapshotId; + private final RecordsIterator<T> iterator; + + private Optional<Batch<T>> nextBatch = Optional.empty(); + private OptionalLong lastContainedLogTimestamp = OptionalLong.empty(); + + private RecordsSnapshotReader( + OffsetAndEpoch snapshotId, + RecordsIterator<T> iterator + ) { + this.snapshotId = snapshotId; + this.iterator = iterator; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshotId; + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { Review comment: Add the `@Overrride` annotation to this method. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.RecordsIterator; + +public final class RecordsSnapshotReader<T> implements SnapshotReader<T> { + private final OffsetAndEpoch snapshotId; + private final RecordsIterator<T> iterator; + + private Optional<Batch<T>> nextBatch = Optional.empty(); + private OptionalLong lastContainedLogTimestamp = OptionalLong.empty(); + + private RecordsSnapshotReader( + OffsetAndEpoch snapshotId, + RecordsIterator<T> iterator + ) { + this.snapshotId = snapshotId; + this.iterator = iterator; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ Review comment: Please remove the documentation from all of the methods that override methods already documented in `SnapshotReader`. ########## File path: raft/src/main/java/org/apache/kafka/snapshot/RecordsSnapshotReader.java ########## @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.snapshot; + +import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.OptionalLong; + +import org.apache.kafka.common.utils.BufferSupplier; +import org.apache.kafka.raft.Batch; +import org.apache.kafka.raft.OffsetAndEpoch; +import org.apache.kafka.server.common.serialization.RecordSerde; +import org.apache.kafka.raft.internals.RecordsIterator; + +public final class RecordsSnapshotReader<T> implements SnapshotReader<T> { + private final OffsetAndEpoch snapshotId; + private final RecordsIterator<T> iterator; + + private Optional<Batch<T>> nextBatch = Optional.empty(); + private OptionalLong lastContainedLogTimestamp = OptionalLong.empty(); + + private RecordsSnapshotReader( + OffsetAndEpoch snapshotId, + RecordsIterator<T> iterator + ) { + this.snapshotId = snapshotId; + this.iterator = iterator; + } + + /** + * Returns the end offset and epoch for the snapshot. + */ + public OffsetAndEpoch snapshotId() { + return snapshotId; + } + + /** + * Returns the last log offset which is represented in the snapshot. + */ + public long lastContainedLogOffset() { + return snapshotId.offset - 1; + } + + /** + * Returns the epoch of the last log offset which is represented in the snapshot. + */ + public int lastContainedLogEpoch() { + return snapshotId.epoch; + } + + /** + * Returns the timestamp of the last log offset which is represented in the snapshot. + */ + public long lastContainedLogTimestamp() { + if (!lastContainedLogTimestamp.isPresent()) { + nextBatch.ifPresent(batch -> { + throw new IllegalStateException( + String.format( + "nextBatch was present when last contained log timestamp was not present", + batch + ) + ); + }); + nextBatch = nextBatch(); + } + + return lastContainedLogTimestamp.getAsLong(); + } + + @Override + public boolean hasNext() { + if (!nextBatch.isPresent()) { + nextBatch = nextBatch(); + } + + return nextBatch.isPresent(); + } + + @Override + public Batch<T> next() { + if (!hasNext()) { + throw new NoSuchElementException("Snapshot reader doesn't have any more elements"); + } + + Batch<T> batch = nextBatch.get(); + nextBatch = Optional.empty(); + + return batch; + } + + /** + * Closes the snapshot reader. + */ + public void close() { Review comment: Add the `@Overrride` annotation to this method. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org