mimaison commented on code in PR #13278:
URL: https://github.com/apache/kafka/pull/13278#discussion_r1266857754


##########
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.CoreUtils;
+import org.apache.kafka.server.util.Json;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * A command for delete records of the given partitions down to the specified 
offset.
+ */
+public class DeleteRecordsCommand {
+    private static final int EARLIEST_VERSION = 1;
+
+    public static void main(String[] args) throws Exception {
+        execute(args, System.out);
+    }
+
+    static Collection<Tuple<TopicPartition, Long>> 
parseOffsetJsonStringWithoutDedup(String jsonData) {
+        try {
+            JsonNode js = Json.tryParseFull(jsonData).node();
+
+            int version = EARLIEST_VERSION;
+
+            if (js.has("version"))
+                version = js.get("version").asInt();
+
+            return parseJsonData(version, js);
+        } catch (JsonProcessingException e) {
+            throw new AdminOperationException("The input string is not a valid 
JSON");
+        }
+    }
+
+    private static Collection<Tuple<TopicPartition, Long>> parseJsonData(int 
version, JsonNode js) throws JsonMappingException {
+        if (version == 1) {
+            JsonNode partitions = js.get("partitions");
+
+            if (partitions == null || !partitions.isArray())
+                throw new AdminOperationException("Missing partitions field");
+
+            Collection<Tuple<TopicPartition, Long>> res = new ArrayList<>();
+
+            for (JsonNode partitionJs : partitions) {
+                String topic = getOrThrow(partitionJs, "topic").asText();
+                int partition = getOrThrow(partitionJs, "partition").asInt();
+                long offset = getOrThrow(partitionJs, "offset").asLong();
+
+                res.add(new Tuple<>(new TopicPartition(topic, partition), 
offset));
+            }
+
+            return res;
+        }
+
+        throw new AdminOperationException("Not supported version field value " 
+ version);
+    }
+
+    public static void execute(String[] args, PrintStream out) throws 
IOException {
+        DeleteRecordsCommandOptions opts = new 
DeleteRecordsCommandOptions(args);
+
+        try (Admin adminClient = createAdminClient(opts)) {
+            execute0(adminClient, 
Utils.readFileAsString(opts.options.valueOf(opts.offsetJsonFileOpt)), out);
+        }
+    }
+
+    static void execute0(Admin adminClient, String offsetJsonString, 
PrintStream out) {

Review Comment:
   Could this just be `execute()`?



##########
server-common/src/main/java/org/apache/kafka/server/common/AdminCommandFailedException.java:
##########
@@ -15,9 +15,18 @@
  * limitations under the License.
  */
 
-package kafka.admin
+package org.apache.kafka.server.common;
 
-class AdminOperationException(val error: String, cause: Throwable) extends 
RuntimeException(error, cause) {
-  def this(error: Throwable) = this(error.getMessage, error)
-  def this(msg: String) = this(msg, null)
-}
\ No newline at end of file
+public class AdminCommandFailedException extends RuntimeException {
+    public AdminCommandFailedException(String message) {
+        super(message);
+    }
+
+    public AdminCommandFailedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public AdminCommandFailedException() {

Review Comment:
   Do we need this constructor?



##########
checkstyle/import-control.xml:
##########
@@ -295,6 +295,7 @@
 
   <subpackage name="tools">
     <allow pkg="org.apache.kafka.common"/>
+    <allow pkg="org.apache.kafka.server.common" />

Review Comment:
   We already allow that package just below



##########
server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * General helper functions!
+ *
+ * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
+ * the standard library etc.
+ *
+ * If you are making a new helper function and want to add it to this class 
please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
+ */
+public class CoreUtils {
+    /**
+     * Returns a list of duplicated items
+     */
+    public static <T> Iterable<T> duplicates(Iterable<T> s) {
+        return StreamSupport.stream(s.spliterator(), false)

Review Comment:
   I think it may be one of these cases where a non-stream algorithm might be 
more readable. Something like:
   ```
   public static <T> Set<T> duplicates2(List<T> set) {
       Set<T> elements = new HashSet<>();
       return set.stream()
               .filter(n -> !elements.add(n))
               .collect(Collectors.toSet());
   }
   ```



##########
server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * General helper functions!
+ *
+ * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
+ * the standard library etc.
+ *
+ * If you are making a new helper function and want to add it to this class 
please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
+ */
+public class CoreUtils {
+    /**
+     * Returns a list of duplicated items
+     */
+    public static <T> Iterable<T> duplicates(Iterable<T> s) {

Review Comment:
   I wonder if we should return `Set` instead of `Iterable`. The Java 
`Iterable` is a lot less flexible than its Scala counterpart.



##########
tools/src/main/java/org/apache/kafka/tools/DeleteRecordsCommand.java:
##########
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonMappingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import joptsimple.OptionSpec;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.DeleteRecordsResult;
+import org.apache.kafka.clients.admin.RecordsToDelete;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.apache.kafka.server.util.CommandDefaultOptions;
+import org.apache.kafka.server.util.CommandLineUtils;
+import org.apache.kafka.server.util.CoreUtils;
+import org.apache.kafka.server.util.Json;
+
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.StringJoiner;
+import java.util.concurrent.ExecutionException;
+import java.util.stream.Collectors;
+
+/**
+ * A command for delete records of the given partitions down to the specified 
offset.
+ */
+public class DeleteRecordsCommand {
+    private static final int EARLIEST_VERSION = 1;
+
+    public static void main(String[] args) throws Exception {
+        execute(args, System.out);
+    }
+
+    static Collection<Tuple<TopicPartition, Long>> 
parseOffsetJsonStringWithoutDedup(String jsonData) {
+        try {
+            JsonNode js = Json.tryParseFull(jsonData).node();
+
+            int version = EARLIEST_VERSION;
+
+            if (js.has("version"))
+                version = js.get("version").asInt();
+
+            return parseJsonData(version, js);
+        } catch (JsonProcessingException e) {
+            throw new AdminOperationException("The input string is not a valid 
JSON");
+        }
+    }
+
+    private static Collection<Tuple<TopicPartition, Long>> parseJsonData(int 
version, JsonNode js) throws JsonMappingException {

Review Comment:
   We now have `JsonValue` in `server-common` so can we use that (if it makes 
sense) like the original logic.



##########
server-common/src/main/java/org/apache/kafka/server/util/CoreUtils.java:
##########
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import java.util.Map;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+/**
+ * General helper functions!
+ *
+ * This is for general helper functions that aren't specific to Kafka logic. 
Things that should have been included in
+ * the standard library etc.
+ *
+ * If you are making a new helper function and want to add it to this class 
please ensure the following:
+ * 1. It has documentation
+ * 2. It is the most general possible utility, not just the thing you needed 
in one particular place
+ * 3. You have tests for it if it is nontrivial in any way
+ */
+public class CoreUtils {
+    /**
+     * Returns a list of duplicated items
+     */
+    public static <T> Iterable<T> duplicates(Iterable<T> s) {

Review Comment:
   Also not sure why this goes into the `server-common` module. The original 
`duplicates()` method seems to only be called by tools, so maybe we can put 
this into `tools`?



##########
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL)
+@Tag("integration")
+public class DeleteRecordsCommandTest {
+
+    private final ClusterInstance cluster;
+    public DeleteRecordsCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @ClusterTest
+    public void testCommandZk() throws Exception {
+        Properties adminProps = new Properties();
+
+        adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1);
+
+        try (Admin admin = cluster.createAdminClient(adminProps)) {
+            assertThrows(
+                AdminCommandFailedException.class,
+                () -> DeleteRecordsCommand.execute0(admin, "{\"partitions\":[" 
+
+                    "{\"topic\":\"t\", \"partition\":0, \"offset\":1}," +
+                    "{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" +
+                    "}", System.out),
+                "Offset json file contains duplicate topic partitions: t-0"
+            );
+
+            admin.createTopics(Collections.singleton(new NewTopic("t", 1, 
(short) 1))).all().get();
+
+            Properties props = new Properties();
+
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+
+            try (KafkaProducer<?, String> producer = new 
KafkaProducer<>(props)) {
+                producer.send(new ProducerRecord<>("t", "1")).get();
+                producer.send(new ProducerRecord<>("t", "2")).get();
+                producer.send(new ProducerRecord<>("t", "3")).get();
+            }
+
+            executeAndAssertOutput(
+                "{\"partitions\":[{\"topic\":\"t\", \"partition\":0, 
\"offset\":1}]}",
+                "partition: t-0\tlow_watermark: 1",
+                admin
+            );
+
+            executeAndAssertOutput(
+                "{\"partitions\":[{\"topic\":\"t\", \"partition\":42, 
\"offset\":42}]}",
+                "partition: t-42\terror",
+                admin
+            );
+        }
+    }
+
+    private static void executeAndAssertOutput(String json, String expOut, 
Admin admin) {
+        String output =
+            ToolsTestUtils.captureStandardOut(() -> 
DeleteRecordsCommand.execute0(admin, json, System.out));
+        assertTrue(output.contains(expOut));
+    }
+}
+
+/**
+ * Unit test of {@link DeleteRecordsCommand} tool.
+ */
+class DeleteRecordsCommandUnitTest {
+    @Test
+    public void testOffsetFileNotExists() {
+        assertThrows(IOException.class, () -> DeleteRecordsCommand.main(new 
String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--offset-json-file", "/not/existing/file"
+        }));
+    }
+
+    @Test
+    public void testCommandConfigNotExists() {
+        assertThrows(NoSuchFileException.class, () -> 
DeleteRecordsCommand.main(new String[] {

Review Comment:
   It's strange that the other test above has a different exception for a 
similar condition (file does not exist). Is this similar in the original tool?



##########
server-common/src/test/java/org/apache/kafka/server/util/CoreUtilsTest.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.util;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class CoreUtilsTest {
+    @Test
+    public void testDuplicates() {
+        assertIterableEquals(

Review Comment:
   The expected value should be the first argument. Same below



##########
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL)
+@Tag("integration")
+public class DeleteRecordsCommandTest {
+
+    private final ClusterInstance cluster;
+    public DeleteRecordsCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @ClusterTest
+    public void testCommandZk() throws Exception {
+        Properties adminProps = new Properties();
+
+        adminProps.put(AdminClientConfig.RETRIES_CONFIG, 1);
+
+        try (Admin admin = cluster.createAdminClient(adminProps)) {
+            assertThrows(
+                AdminCommandFailedException.class,
+                () -> DeleteRecordsCommand.execute0(admin, "{\"partitions\":[" 
+
+                    "{\"topic\":\"t\", \"partition\":0, \"offset\":1}," +
+                    "{\"topic\":\"t\", \"partition\":0, \"offset\":1}]" +
+                    "}", System.out),
+                "Offset json file contains duplicate topic partitions: t-0"
+            );
+
+            admin.createTopics(Collections.singleton(new NewTopic("t", 1, 
(short) 1))).all().get();
+
+            Properties props = new Properties();
+
+            props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
cluster.bootstrapServers());
+            props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+            props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
+
+            try (KafkaProducer<?, String> producer = new 
KafkaProducer<>(props)) {
+                producer.send(new ProducerRecord<>("t", "1")).get();
+                producer.send(new ProducerRecord<>("t", "2")).get();
+                producer.send(new ProducerRecord<>("t", "3")).get();
+            }
+
+            executeAndAssertOutput(
+                "{\"partitions\":[{\"topic\":\"t\", \"partition\":0, 
\"offset\":1}]}",
+                "partition: t-0\tlow_watermark: 1",
+                admin
+            );
+
+            executeAndAssertOutput(
+                "{\"partitions\":[{\"topic\":\"t\", \"partition\":42, 
\"offset\":42}]}",
+                "partition: t-42\terror",
+                admin
+            );
+        }
+    }
+
+    private static void executeAndAssertOutput(String json, String expOut, 
Admin admin) {
+        String output =
+            ToolsTestUtils.captureStandardOut(() -> 
DeleteRecordsCommand.execute0(admin, json, System.out));
+        assertTrue(output.contains(expOut));
+    }
+}
+
+/**
+ * Unit test of {@link DeleteRecordsCommand} tool.
+ */
+class DeleteRecordsCommandUnitTest {
+    @Test
+    public void testOffsetFileNotExists() {
+        assertThrows(IOException.class, () -> DeleteRecordsCommand.main(new 
String[]{
+            "--bootstrap-server", "localhost:9092",
+            "--offset-json-file", "/not/existing/file"
+        }));
+    }
+
+    @Test
+    public void testCommandConfigNotExists() {
+        assertThrows(NoSuchFileException.class, () -> 
DeleteRecordsCommand.main(new String[] {
+            "--bootstrap-server", "localhost:9092",
+            "--offset-json-file", "/not/existing/file",
+            "--command-config", "/another/not/existing/file"
+        }));
+    }
+
+    @Test
+    public void testWrongVersion() {
+        assertThrowsAdminOperationException("{\"version\":\"string\"}");
+        assertThrowsAdminOperationException("{\"version\":2}");
+    }
+
+    @Test
+    public void testWrongPartitions() {
+        assertThrowsAdminOperationException("{\"version\":1}");
+        assertThrowsAdminOperationException("{\"partitions\":2}");
+        assertThrowsAdminOperationException("{\"partitions\":{}}");
+        assertThrowsAdminOperationException("{\"partitions\":[{}]}");
+        
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\"}]}");
+        
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", 
\"partition\": \"\"}]}");
+        
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", 
\"partition\": 0}]}");
+        
assertThrowsAdminOperationException("{\"partitions\":[{\"topic\":\"t\", 
\"offset\":0}]}");

Review Comment:
   What happens if there are extra fields?



##########
tools/src/test/java/org/apache/kafka/tools/DeleteRecordsCommandTest.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import kafka.test.ClusterInstance;
+import kafka.test.annotation.ClusterTest;
+import kafka.test.annotation.ClusterTestDefaults;
+import kafka.test.annotation.Type;
+import kafka.test.junit.ClusterTestExtensions;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.server.common.AdminCommandFailedException;
+import org.apache.kafka.server.common.AdminOperationException;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+
+import java.io.IOException;
+import java.nio.file.NoSuchFileException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.Properties;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+@ExtendWith(value = ClusterTestExtensions.class)
+@ClusterTestDefaults(clusterType = Type.ALL)
+@Tag("integration")
+public class DeleteRecordsCommandTest {
+
+    private final ClusterInstance cluster;
+    public DeleteRecordsCommandTest(ClusterInstance cluster) {
+        this.cluster = cluster;
+    }
+
+    @ClusterTest
+    public void testCommandZk() throws Exception {

Review Comment:
   Why is it called `testCommandZk`? We set `clusterType` to `ALL` above



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to