chia7712 commented on code in PR #18057:
URL: https://github.com/apache/kafka/pull/18057#discussion_r1873793851


##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java:
##########
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+class LocalLogTest {
+
+    private static final MockTime MOCK_TIME = new MockTime();
+
+    private final File tmpDir = TestUtils.tempDirectory();
+    private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+    private final TopicPartition topicPartition = new 
TopicPartition("test_topic", 1);
+    private final LogDirFailureChannel logDirFailureChannel = new 
LogDirFailureChannel(10);
+
+    private LocalLog log;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        log = createLocalLogWithActiveSegment(logDir, new LogConfig(new 
Properties()));
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        try {
+            log.close();
+        } catch (KafkaStorageException kse) {
+            // ignore
+        }
+        Utils.delete(tmpDir);

Review Comment:
   Those deletions are redundant since the helper will handle the cleanup.



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java:
##########
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+class LocalLogTest {
+
+    private static final MockTime MOCK_TIME = new MockTime();
+
+    private final File tmpDir = TestUtils.tempDirectory();
+    private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+    private final TopicPartition topicPartition = new 
TopicPartition("test_topic", 1);
+    private final LogDirFailureChannel logDirFailureChannel = new 
LogDirFailureChannel(10);
+
+    private LocalLog log;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        log = createLocalLogWithActiveSegment(logDir, new LogConfig(new 
Properties()));
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        try {
+            log.close();
+        } catch (KafkaStorageException kse) {
+            // ignore
+        }
+        Utils.delete(tmpDir);
+        Utils.delete(logDir);
+    }
+
+    static class KeyValue {

Review Comment:
   this can be simplified by `record` class - 
   ```java
       record KeyValue(String key, String value) {
           SimpleRecord toRecord(long timestamp) {
               return new SimpleRecord(timestamp, key.getBytes(), 
value.getBytes());
           }
   
           SimpleRecord toRecord() {
               return new SimpleRecord(MOCK_TIME.milliseconds(), 
key.getBytes(), value.getBytes());
           }
   
           static KeyValue fromRecord(Record record) {
               String key = record.hasKey()
                   ? StandardCharsets.UTF_8.decode(record.key()).toString()
                   : "";
               String value = record.hasValue()
                   ? StandardCharsets.UTF_8.decode(record.value()).toString()
                   : "";
               return new KeyValue(key, value);
           }
       }
   ```



##########
storage/src/test/java/org/apache/kafka/storage/internals/log/LocalLogTest.java:
##########
@@ -0,0 +1,785 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.storage.internals.log;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.errors.KafkaStorageException;
+import org.apache.kafka.common.record.MemoryRecords;
+import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.util.MockTime;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.spy;
+
+class LocalLogTest {
+
+    private static final MockTime MOCK_TIME = new MockTime();
+
+    private final File tmpDir = TestUtils.tempDirectory();
+    private final File logDir = TestUtils.randomPartitionLogDir(tmpDir);
+    private final TopicPartition topicPartition = new 
TopicPartition("test_topic", 1);
+    private final LogDirFailureChannel logDirFailureChannel = new 
LogDirFailureChannel(10);
+
+    private LocalLog log;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        log = createLocalLogWithActiveSegment(logDir, new LogConfig(new 
Properties()));
+    }
+
+    @AfterEach
+    public void tearDown() throws IOException {
+        try {
+            log.close();
+        } catch (KafkaStorageException kse) {
+            // ignore
+        }
+        Utils.delete(tmpDir);
+        Utils.delete(logDir);
+    }
+
+    static class KeyValue {
+        private final String key;
+        private final String value;
+
+        KeyValue(String key, String value) {
+            this.key = key;
+            this.value = value;
+        }
+
+        SimpleRecord toRecord(long timestamp) {
+            return new SimpleRecord(timestamp, key.getBytes(), 
value.getBytes());
+        }
+
+        SimpleRecord toRecord() {
+            return new SimpleRecord(MOCK_TIME.milliseconds(), key.getBytes(), 
value.getBytes());
+        }
+
+        static KeyValue fromRecord(Record record) {
+            String key = record.hasKey()
+                ? StandardCharsets.UTF_8.decode(record.key()).toString()
+                : "";
+            String value = record.hasValue()
+                ? StandardCharsets.UTF_8.decode(record.value()).toString()
+                : "";
+            return new KeyValue(key, value);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+            KeyValue keyValue = (KeyValue) o;
+            return Objects.equals(key, keyValue.key) && Objects.equals(value, 
keyValue.value);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(key, value);
+        }
+    }
+
+    private List<SimpleRecord> kvsToRecords(List<KeyValue> keyValues) {
+        return 
keyValues.stream().map(KeyValue::toRecord).collect(Collectors.toList());
+    }
+
+    private List<KeyValue> recordsToKvs(Iterable<Record> records) {
+        List<KeyValue> keyValues = new ArrayList<>();
+        for (Record record : records) {
+            keyValues.add(KeyValue.fromRecord(record));
+        }
+        return keyValues;
+    }
+
+    private void appendRecords(List<SimpleRecord> records, long initialOffset) 
throws IOException {
+        log.append(initialOffset + records.size() - 1,
+                records.get(0).timestamp(),
+                initialOffset,
+                MemoryRecords.withRecords(initialOffset, Compression.NONE, 0, 
records.toArray(new SimpleRecord[0])));
+    }
+
+    private FetchDataInfo readRecords(long startOffset) throws IOException {
+        return readRecords(
+                startOffset,
+                log.segments().activeSegment().size(),
+                log.logEndOffsetMetadata()
+        );
+    }
+
+    private FetchDataInfo readRecords(int maxLength) throws IOException {
+        return readRecords(
+                0L,
+                maxLength,
+                log.logEndOffsetMetadata()
+        );
+    }
+
+    private FetchDataInfo readRecords(long startOffset, LogOffsetMetadata 
maxOffsetMetadata) throws IOException {
+        return readRecords(
+                startOffset,
+                log.segments().activeSegment().size(),
+                maxOffsetMetadata
+        );
+    }
+
+    private FetchDataInfo readRecords(
+                            long startOffset,
+                            int maxLength,
+                            LogOffsetMetadata maxOffsetMetadata) throws 
IOException {
+        return log.read(startOffset,
+                maxLength,
+                false,
+                maxOffsetMetadata,
+                false);
+    }
+
+    private static List<Record> mkList(Iterable<Record> iterable) {

Review Comment:
   this can be replaced by `Utils.toList`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to