dajac commented on a change in pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#discussion_r452893782



##########
File path: core/src/main/scala/kafka/admin/LogDirsCommand.scala
##########
@@ -64,7 +63,7 @@ object LogDirsCommand {
                     "logDirs" -> logDirInfos.map { case (logDir, logDirInfo) =>
                         Map(
                             "logDir" -> logDir,
-                            "error" -> logDirInfo.error.exceptionName(),
+                            "error" -> Option(logDirInfo.error).flatMap(ex => 
Some(ex.getClass.getName)).orNull,

Review comment:
       nit: Can't we use `map` instead of `flatMap` and remove the `Some`?

##########
File path: clients/src/main/java/org/apache/kafka/clients/admin/ReplicaInfo.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+/**
+ * A description of a replica on a particular broker.
+ */
+public class ReplicaInfo {
+
+    private final long size;
+    private final long offsetLag;
+    private final boolean isFuture;
+
+    public ReplicaInfo(long size, long offsetLag, boolean isFuture) {
+        this.size = size;
+        this.offsetLag = offsetLag;
+        this.isFuture = isFuture;
+    }
+
+    /**
+     * The total size of the log segments in this replica in bytes.
+     */
+    public long size() {
+        return size;
+    }
+
+    /**
+     * The lag of the log's LEO with respect to the partition's
+     * high watermark (if it is the current log for the partition)
+     * or the current replica's LEO (if it is the {@linkplain #isFuture() 
future log}
+     * for the partition).
+     */
+    public long offsetLag() {
+        return offsetLag;
+    }
+
+    /**
+     * Whether this replica has been created by a AlterReplicaLogDirsRequest
+     * but not yet replaced the current replica on the broker.
+     *
+     * @return true if this log is created by AlterReplicaLogDirsRequest and 
will replace the current log
+     * of the replica at some time in the future.
+     */
+    public boolean isFuture() {
+        return isFuture;
+    }
+
+    @Override
+    public String toString() {
+        return "ReplicaInfo{" +
+                "size=" + size +
+                ", offsetLag=" + offsetLag +
+                ", isFuture=" + isFuture +
+                '}';

Review comment:
       nit: curly braces instead of parenthesis. 

##########
File path: 
core/src/test/scala/unit/kafka/server/DescribeLogDirsRequestTest.scala
##########
@@ -46,15 +48,23 @@ class DescribeLogDirsRequestTest extends BaseRequestTest {
 
     val request = new DescribeLogDirsRequest.Builder(new 
DescribeLogDirsRequestData().setTopics(null)).build()
     val response = connectAndReceive[DescribeLogDirsResponse](request, 
destination = controllerSocketServer)
-    val logDirInfos = response.logDirInfos()
+    case class ReplicaInfo(size: Long, offsetLag: Long, isFuture: Boolean)
+    case class LogDirInfo(error: Errors, replicaInfos: Map[TopicPartition, 
ReplicaInfo])

Review comment:
       That's a pity that we have to redefine there classes here. Couldn't we 
update the test to work with the plain response instead?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2356,32 +2378,32 @@ public DescribeReplicaLogDirsResult 
describeReplicaLogDirs(Collection<TopicParti
                 @Override
                 public void handleResponse(AbstractResponse abstractResponse) {
                     DescribeLogDirsResponse response = 
(DescribeLogDirsResponse) abstractResponse;
-                    for (Map.Entry<String, DescribeLogDirsResponse.LogDirInfo> 
responseEntry: response.logDirInfos().entrySet()) {
+                    for (Map.Entry<String, LogDirDescription> responseEntry: 
logDirDescriptions(response).entrySet()) {
                         String logDir = responseEntry.getKey();
-                        DescribeLogDirsResponse.LogDirInfo logDirInfo = 
responseEntry.getValue();
+                        LogDirDescription logDirInfo = 
responseEntry.getValue();
 
                         // No replica info will be provided if the log 
directory is offline
-                        if (logDirInfo.error == Errors.KAFKA_STORAGE_ERROR)
+                        if (logDirInfo.error() instanceof 
KafkaStorageException)
                             continue;
-                        if (logDirInfo.error != Errors.NONE)
+                        if (logDirInfo.error() != null)
                             handleFailure(new IllegalStateException(
-                                "The error " + logDirInfo.error + " for log 
directory " + logDir + " in the response from broker " + brokerId + " is 
illegal"));
+                                "The error " + 
logDirInfo.error().getClass().getName() + " for log directory " + logDir + " in 
the response from broker " + brokerId + " is illegal"));
 
-                        for (Map.Entry<TopicPartition, 
DescribeLogDirsResponse.ReplicaInfo> replicaInfoEntry: 
logDirInfo.replicaInfos.entrySet()) {
+                        for (Map.Entry<TopicPartition, ReplicaInfo> 
replicaInfoEntry: logDirInfo.replicaInfos().entrySet()) {
                             TopicPartition tp = replicaInfoEntry.getKey();
-                            DescribeLogDirsResponse.ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
+                            ReplicaInfo replicaInfo = 
replicaInfoEntry.getValue();
                             ReplicaLogDirInfo replicaLogDirInfo = 
replicaDirInfoByPartition.get(tp);
                             if (replicaLogDirInfo == null) {
                                 handleFailure(new IllegalStateException(

Review comment:
       Not related to your PR but this look weird. It seems that we fail all 
the futures if an unexpected replica is provided by the broker in the response. 
I think that we should log a warning when this happen like we do in the other 
methods (e.g. createTopics). What do you think?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2308,6 +2312,24 @@ void handleFailure(Throwable throwable) {
         return new DescribeLogDirsResult(new HashMap<>(futures));
     }
 
+    private Map<String, LogDirDescription> 
logDirDescriptions(DescribeLogDirsResponse response) {
+        HashMap<String, LogDirDescription> result = new 
HashMap<>(response.data().results().size());
+        for (DescribeLogDirsResponseData.DescribeLogDirsResult logDirResult : 
response.data().results()) {
+            Map<TopicPartition, ReplicaInfo> replicaInfoMap = new HashMap<>();
+            if (logDirResult.topics() != null) {

Review comment:
       nit: `topics` is not nullable in the protocol so it should never be 
`null`, does it?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -1055,6 +1059,154 @@ public void testDescribeConfigsUnrequested() throws 
Exception {
         }
     }
 
+    @SuppressWarnings("deprecation")
+    @Test
+    public void testDescribeLogDirs() throws ExecutionException, 
InterruptedException {
+        List<Integer> brokers = singletonList(0);
+        TopicPartition tp = new TopicPartition("topic", 12);
+
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
+            env.kafkaClient().prepareResponseFrom(new DescribeLogDirsResponse(
+                    new 
DescribeLogDirsResponseData().setResults(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsResult()
+                            .setErrorCode(Errors.NONE.code())
+                            .setLogDir("/var/data/kafka")
+                            .setTopics(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsTopic()
+                                    .setName(tp.topic())
+                                    .setPartitions(singletonList(new 
DescribeLogDirsResponseData.DescribeLogDirsPartition()
+                                            .setPartitionIndex(tp.partition())
+                                            .setPartitionSize(1234567890)
+                                            .setIsFutureKey(false)
+                                            .setOffsetLag(0)))))
+                    ))), env.cluster().nodeById(0));
+            DescribeLogDirsResult result = 
env.adminClient().describeLogDirs(brokers);
+            Map<Integer, KafkaFuture<Map<String, LogDirDescription>>> 
descriptions = result.descriptions();
+            assertEquals(Collections.singleton(0), descriptions.keySet());
+            assertNotNull(descriptions.get(0));
+            assertEquals(Collections.singleton("/var/data/kafka"), 
descriptions.get(0).get().keySet());
+            
assertNull(descriptions.get(0).get().get("/var/data/kafka").error());
+            assertEquals(Collections.singleton(tp), 
descriptions.get(0).get().get("/var/data/kafka").replicaInfos().keySet());
+            assertEquals(1234567890, 
descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).size());
+            assertEquals(0, 
descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).offsetLag());
+            
assertFalse(descriptions.get(0).get().get("/var/data/kafka").replicaInfos().get(tp).isFuture());

Review comment:
       These blocks of assertions are quite hard to read. Can we try to make 
them more digestable? We could perhaps extract temporary variable to reduce the 
number of `.get()`. We could also define an `verifyDescription` helper that 
verify a `LogDirDescription` for instance. It may be worth having dedicated 
unit tests for the new and the old APIs as well.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/LogDirDescription.java
##########
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ApiException;
+
+import java.util.Map;
+
+import static java.util.Collections.unmodifiableMap;
+
+/**
+ * A description of a log directory on a particular broker.
+ */
+public class LogDirDescription {
+    private final Map<TopicPartition, ReplicaInfo> replicaInfos;
+    private final ApiException error;
+
+    public LogDirDescription(ApiException error, Map<TopicPartition, 
ReplicaInfo> replicaInfos) {
+        this.error = error;
+        this.replicaInfos = replicaInfos;
+    }
+
+    /**
+     * A KafkaStorageException if this log directory is offline,
+     * possibly some other exception if there were problems describing the log 
directory
+     * or null if the directory is online.
+     */
+    public ApiException error() {
+        return error;
+    }
+
+    /**
+     * A map from topic partition to replica information for that partition
+     * in this log directory.
+     */
+    public Map<TopicPartition, ReplicaInfo> replicaInfos() {
+        return unmodifiableMap(replicaInfos);
+    }
+
+    @Override
+    public String toString() {
+        return "LogDirDescription{" +
+                "replicaInfos=" + replicaInfos +
+                ", error=" + error +
+                '}';

Review comment:
       nit: We usually use parenthesis instead of curly braces. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to