chia7712 commented on code in PR #21644:
URL: https://github.com/apache/kafka/pull/21644#discussion_r2891390374


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java:
##########
@@ -146,16 +154,103 @@ public static MemoryRecords records(List<SimpleRecord> 
records,
                                         short producerEpoch,
                                         int sequence,
                                         long baseOffset,
-                                        int partitionLeaderEpoch) {
+                                        int partitionLeaderEpoch,
+                                        long timestamp) {
         ByteBuffer buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
         MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, baseOffset,
-            System.currentTimeMillis(), producerId, producerEpoch, sequence, 
false, partitionLeaderEpoch);
+            timestamp, producerId, producerEpoch, sequence, false, 
partitionLeaderEpoch);
 
         records.forEach(builder::append);
 
         return builder.build();
     }
 
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        return records(records, magicValue, codec, producerId, producerEpoch, 
sequence, baseOffset, partitionLeaderEpoch, System.currentTimeMillis());
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, producerId, producerEpoch, sequence, baseOffset, 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, long 
timestamp) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH, timestamp);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, long 
baseOffset, int partitionLeaderEpoch) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
+    }
+
+    public static void deleteProducerSnapshotFiles(File logDir) throws 
IOException {
+        Set<File> files = Stream.of(logDir.listFiles()).filter(f -> f.isFile() 
&& 
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)).collect(Collectors.toSet());
+        for (File file : files) {
+            Utils.delete(file);
+        }
+    }
+
+    public static List<Long> listProducerSnapshotOffsets(File logDir) throws 
IOException {
+        return ProducerStateManager.listSnapshotFiles(logDir).stream().map(f 
-> f.offset).sorted().toList();
+    }
+
+    public static void appendNonTransactionalAsLeader(UnifiedLog log, int 
numRecords) throws IOException {
+        List<SimpleRecord> simpleRecords = new ArrayList<>();
+        for (int i = 0; i < numRecords; i++) {
+            simpleRecords.add(new SimpleRecord(String.valueOf(i).getBytes()));
+        }
+        MemoryRecords records = MemoryRecords.withRecords(Compression.NONE, 
simpleRecords.toArray(new SimpleRecord[0]));
+        log.appendAsLeader(records, 0);
+    }
+
+    public static Consumer<Integer> appendTransactionalAsLeader(UnifiedLog log,
+                                                                long 
producerId,
+                                                                short 
producerEpoch,
+                                                                Time time) {
+        return appendIdempotentAsLeader(log, producerId, producerEpoch, time, 
true);
+    }
+
+    public static Consumer<Integer> appendIdempotentAsLeader(UnifiedLog log,
+                                                             long producerId,
+                                                             short 
producerEpoch,
+                                                             Time time,
+                                                             boolean 
isTransactional) {
+        final AtomicInteger sequence = new AtomicInteger(0);
+        return numRecords -> {
+            int baseSequence = sequence.get();
+            List<SimpleRecord> simpleRecords = new ArrayList<>();
+            for (int i = baseSequence; i < baseSequence + numRecords; i++) {
+                simpleRecords.add(new SimpleRecord(time.milliseconds(), 
String.valueOf(i).getBytes()));
+            }
+
+            MemoryRecords records = isTransactional
+                ? MemoryRecords.withTransactionalRecords(Compression.NONE, 
producerId,
+                        producerEpoch, baseSequence, simpleRecords.toArray(new 
SimpleRecord[0]))
+                : MemoryRecords.withIdempotentRecords(Compression.NONE, 
producerId,
+                        producerEpoch, baseSequence, simpleRecords.toArray(new 
SimpleRecord[0]));
+
+            try {

Review Comment:
   ```java
   Assertions.assertDoesNotThrow(() -> log.appendAsLeader(records, 0));
   ```



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LogTestUtils.java:
##########
@@ -146,16 +154,103 @@ public static MemoryRecords records(List<SimpleRecord> 
records,
                                         short producerEpoch,
                                         int sequence,
                                         long baseOffset,
-                                        int partitionLeaderEpoch) {
+                                        int partitionLeaderEpoch,
+                                        long timestamp) {
         ByteBuffer buf = 
ByteBuffer.allocate(DefaultRecordBatch.sizeInBytes(records));
         MemoryRecordsBuilder builder = MemoryRecords.builder(buf, magicValue, 
codec, TimestampType.CREATE_TIME, baseOffset,
-            System.currentTimeMillis(), producerId, producerEpoch, sequence, 
false, partitionLeaderEpoch);
+            timestamp, producerId, producerEpoch, sequence, false, 
partitionLeaderEpoch);
 
         records.forEach(builder::append);
 
         return builder.build();
     }
 
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        byte magicValue,
+                                        Compression codec,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset,
+                                        int partitionLeaderEpoch) {
+        return records(records, magicValue, codec, producerId, producerEpoch, 
sequence, baseOffset, partitionLeaderEpoch, System.currentTimeMillis());
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records,
+                                        long producerId,
+                                        short producerEpoch,
+                                        int sequence,
+                                        long baseOffset) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, producerId, producerEpoch, sequence, baseOffset, 
RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, long 
timestamp) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, 0L, RecordBatch.NO_PARTITION_LEADER_EPOCH, timestamp);
+    }
+
+    public static MemoryRecords records(List<SimpleRecord> records, long 
baseOffset, int partitionLeaderEpoch) {
+        return records(records, RecordBatch.CURRENT_MAGIC_VALUE, 
Compression.NONE, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, 
RecordBatch.NO_SEQUENCE, baseOffset, partitionLeaderEpoch);
+    }
+
+    public static void deleteProducerSnapshotFiles(File logDir) throws 
IOException {
+        Set<File> files = Stream.of(logDir.listFiles()).filter(f -> f.isFile() 
&& 
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX)).collect(Collectors.toSet());

Review Comment:
   ```java
       public static void deleteProducerSnapshotFiles(File logDir) {
           Stream.of(Objects.requireNonNullElse(logDir.listFiles(), new 
File[0]))
               .filter(f -> f.isFile() && 
f.getName().endsWith(LogFileUtils.PRODUCER_SNAPSHOT_FILE_SUFFIX))
               .forEach(f -> Assertions.assertDoesNotThrow(() -> 
Utils.delete(f)));
       }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to