dajac commented on a change in pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#discussion_r412287554



##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map<TopicPartitionReplica,
         for (TopicPartitionReplica replica : replicaAssignment.keySet())
             futures.put(replica, new KafkaFutureImpl<>());
 
-        Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = 
new HashMap<>();
+        Map<Integer, AlterReplicaLogDirsRequestData> replicaAssignmentByBroker 
= new HashMap<>();
         for (Map.Entry<TopicPartitionReplica, String> entry: 
replicaAssignment.entrySet()) {
             TopicPartitionReplica replica = entry.getKey();
             String logDir = entry.getValue();
             int brokerId = replica.brokerId();
             TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());

Review comment:
       `topicPartition` is not used except for getting the topic and the 
partition above. `replica.topic()` and `replica.partition()` could be directly 
used instead.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map<TopicPartitionReplica,
         for (TopicPartitionReplica replica : replicaAssignment.keySet())
             futures.put(replica, new KafkaFutureImpl<>());
 
-        Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = 
new HashMap<>();
+        Map<Integer, AlterReplicaLogDirsRequestData> replicaAssignmentByBroker 
= new HashMap<>();
         for (Map.Entry<TopicPartitionReplica, String> entry: 
replicaAssignment.entrySet()) {
             TopicPartitionReplica replica = entry.getKey();
             String logDir = entry.getValue();
             int brokerId = replica.brokerId();
             TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());
-            if (!replicaAssignmentByBroker.containsKey(brokerId))
-                replicaAssignmentByBroker.put(brokerId, new HashMap<>());
-            replicaAssignmentByBroker.get(brokerId).put(topicPartition, 
logDir);
+            AlterReplicaLogDirsRequestData value = 
replicaAssignmentByBroker.computeIfAbsent(brokerId,

Review comment:
       nit: Could we rename `value` to something like `alterReplicaLogDirs`?

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##########
@@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult 
alterReplicaLogDirs(Map<TopicPartitionReplica,
         for (TopicPartitionReplica replica : replicaAssignment.keySet())
             futures.put(replica, new KafkaFutureImpl<>());
 
-        Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = 
new HashMap<>();
+        Map<Integer, AlterReplicaLogDirsRequestData> replicaAssignmentByBroker 
= new HashMap<>();
         for (Map.Entry<TopicPartitionReplica, String> entry: 
replicaAssignment.entrySet()) {
             TopicPartitionReplica replica = entry.getKey();
             String logDir = entry.getValue();
             int brokerId = replica.brokerId();
             TopicPartition topicPartition = new 
TopicPartition(replica.topic(), replica.partition());
-            if (!replicaAssignmentByBroker.containsKey(brokerId))
-                replicaAssignmentByBroker.put(brokerId, new HashMap<>());
-            replicaAssignmentByBroker.get(brokerId).put(topicPartition, 
logDir);
+            AlterReplicaLogDirsRequestData value = 
replicaAssignmentByBroker.computeIfAbsent(brokerId,
+                key -> new AlterReplicaLogDirsRequestData());
+            AlterReplicaLogDir alterReplicaLogDir = value.dirs().find(logDir);
+            if (alterReplicaLogDir == null) {
+                alterReplicaLogDir = new AlterReplicaLogDir();
+                alterReplicaLogDir.setPath(logDir);
+                value.dirs().add(alterReplicaLogDir);
+            }
+            AlterReplicaLogDirTopic alterReplicaLogDirTopic = 
alterReplicaLogDir.topics().find(topicPartition.topic());
+            if (alterReplicaLogDirTopic == null) {
+                alterReplicaLogDirTopic = new AlterReplicaLogDirTopic();
+                alterReplicaLogDir.topics().add(alterReplicaLogDirTopic);
+            }
+            alterReplicaLogDirTopic.setName(topicPartition.topic())

Review comment:
       `setName` could be done only once within the if statement.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
##########
@@ -17,122 +17,53 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
 
 
 public class AlterReplicaLogDirsResponse extends AbstractResponse {
 
-    // request level key names
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level key names
-    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private final AlterReplicaLogDirsResponseData data;
 
-    private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V0 = new 
Schema(
-            THROTTLE_TIME_MS,
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
-                            PARTITION_ID,
-                            ERROR_CODE)))))));
-
-    /**
-     * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
-     */
-    private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V1 = 
ALTER_REPLICA_LOG_DIRS_RESPONSE_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0, 
ALTER_REPLICA_LOG_DIRS_RESPONSE_V1};
+    public AlterReplicaLogDirsResponse(Struct struct) {
+        this(struct, ApiKeys.ALTER_REPLICA_LOG_DIRS.latestVersion());
     }
 
-    /**
-     * Possible error code:
-     *
-     * LOG_DIR_NOT_FOUND (57)
-     * KAFKA_STORAGE_ERROR (56)
-     * REPLICA_NOT_AVAILABLE (9)
-     * UNKNOWN (-1)

Review comment:
       Could we keep those and add them in the javadoc of 
`AlterReplicaLogDirsResponse` like we did 
[here](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java#L31)?
 

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception {
         }
     }
 
+    @Test
+    public void testAlterReplicaLogDirsSuccess() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {

Review comment:
       nit: The mocked environment creates 3 nodes (by default) that you can 
use so you don't have to create them. You can get them with 
`env.getCluster().nodeById(..)`.

##########
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##########
@@ -2198,4 +2203,31 @@ private OffsetDeleteResponse 
createOffsetDeleteResponse() {
         return new OffsetDeleteResponse(data);
     }
 
+    private AlterReplicaLogDirsRequest createAlterReplicaLogDirsRequest() {
+        AlterReplicaLogDirsRequestData data = new 
AlterReplicaLogDirsRequestData();
+        data.dirs().add(
+                new 
AlterReplicaLogDirsRequestData.AlterReplicaLogDir().setPath("/data0").setTopics(
+                        new 
AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection(Collections.singletonList(
+                                new 
AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setPartitions(singletonList(0)).setName("topic")
+                        ).iterator())
+                )

Review comment:
       nit: Could we break these long lines like you did bellow already?

##########
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##########
@@ -2589,15 +2589,27 @@ class KafkaApis(val requestChannel: RequestChannel,
       new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ 
unauthorizedConfigs).asJava))
   }
 
+

Review comment:
       nit: this extra empty line can be removed.

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1707,4 +1707,38 @@ class KafkaApisTest {
       0, 0, partitionStates.asJava, Seq(broker).asJava).build()
     metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
   }
+
+  @Test
+  def testAlterReplicaLogDirs(): Unit = {
+    val data = new AlterReplicaLogDirsRequestData()
+    val dir = new AlterReplicaLogDirsRequestData.AlterReplicaLogDir()
+      .setPath("/foo")
+    dir.topics().add(new 
AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setName("t0").setPartitions(asList(0,
 1, 2)))
+    data.dirs().add(dir)
+    val alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(
+      data
+    ).build()
+    val request = buildRequest(alterReplicaLogDirsRequest)
+
+    EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+
+    val capturedResponse = expectNoThrottling()
+    EasyMock.expect(replicaManager.alterReplicaLogDirs(EasyMock.eq(Map(
+      new TopicPartition("t0", 0) -> "/foo",
+      new TopicPartition("t0", 1) -> "/foo",
+      new TopicPartition("t0", 2) -> "/foo"))))
+      .andReturn(Map(
+        new TopicPartition("t0", 0) -> Errors.NONE,
+        new TopicPartition("t0", 1) -> Errors.LOG_DIR_NOT_FOUND,
+        new TopicPartition("t0", 2) -> Errors.INVALID_TOPIC_EXCEPTION))

Review comment:
       nit: Indentation seems inconsistent.

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception {
         }
     }
 
+    @Test
+    public void testAlterReplicaLogDirsSuccess() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.NONE, 0);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            result.values().get(tpr0).get();
+            result.values().get(tpr1).get();
+        }
+    }
+
+    @Test
+    public void testAlterReplicaLogDirsLogDirNotFound() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.LOG_DIR_NOT_FOUND, 
0);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            result.values().get(tpr0).get();
+            TestUtils.assertFutureError(result.values().get(tpr1), 
LogDirNotFoundException.class);
+        }
+    }
+
+    @Test
+    public void testAlterReplicaLogDirsUnrequested() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.NONE, 1, 2);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            // alterReplicaLogDirs() error handling fails all futures, but 
some of them may already be completed

Review comment:
       Couldn't we just remove `tpr0` from the test then? It does not seem to 
bring much.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java
##########
@@ -17,122 +17,53 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
-import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE;
-import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID;
-import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS;
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
 
 
 public class AlterReplicaLogDirsResponse extends AbstractResponse {
 
-    // request level key names
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level key names
-    private static final String PARTITIONS_KEY_NAME = "partitions";
+    private final AlterReplicaLogDirsResponseData data;
 
-    private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V0 = new 
Schema(
-            THROTTLE_TIME_MS,
-            new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema(
-                    TOPIC_NAME,
-                    new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema(
-                            PARTITION_ID,
-                            ERROR_CODE)))))));
-
-    /**
-     * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
-     */
-    private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V1 = 
ALTER_REPLICA_LOG_DIRS_RESPONSE_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0, 
ALTER_REPLICA_LOG_DIRS_RESPONSE_V1};
+    public AlterReplicaLogDirsResponse(Struct struct) {
+        this(struct, ApiKeys.ALTER_REPLICA_LOG_DIRS.latestVersion());
     }
 
-    /**
-     * Possible error code:
-     *
-     * LOG_DIR_NOT_FOUND (57)
-     * KAFKA_STORAGE_ERROR (56)
-     * REPLICA_NOT_AVAILABLE (9)
-     * UNKNOWN (-1)
-     */
-    private final Map<TopicPartition, Errors> responses;
-    private final int throttleTimeMs;
+    public AlterReplicaLogDirsResponse(Struct struct, short version) {
+        this.data = new AlterReplicaLogDirsResponseData(struct, version);
+    }
 
-    public AlterReplicaLogDirsResponse(Struct struct) {
-        throttleTimeMs = struct.get(THROTTLE_TIME_MS);
-        responses = new HashMap<>();
-        for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) {
-            Struct topicStruct = (Struct) topicStructObj;
-            String topic = topicStruct.get(TOPIC_NAME);
-            for (Object partitionStructObj : 
topicStruct.getArray(PARTITIONS_KEY_NAME)) {
-                Struct partitionStruct = (Struct) partitionStructObj;
-                int partition = partitionStruct.get(PARTITION_ID);
-                Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE));
-                responses.put(new TopicPartition(topic, partition), error);
-            }
-        }
+    public AlterReplicaLogDirsResponse(AlterReplicaLogDirsResponseData data) {
+        this.data = data;
     }
 
-    /**
-     * Constructor for version 0.
-     */
-    public AlterReplicaLogDirsResponse(int throttleTimeMs, Map<TopicPartition, 
Errors> responses) {
-        this.throttleTimeMs = throttleTimeMs;
-        this.responses = responses;
+    public AlterReplicaLogDirsResponseData data() {
+        return data;
     }
 
     @Override
     protected Struct toStruct(short version) {
-        Struct struct = new 
Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.responseSchema(version));
-        struct.set(THROTTLE_TIME_MS, throttleTimeMs);
-        Map<String, Map<Integer, Errors>> responsesByTopic = 
CollectionUtils.groupPartitionDataByTopic(responses);
-        List<Struct> topicStructArray = new ArrayList<>();
-        for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : 
responsesByTopic.entrySet()) {
-            Struct topicStruct = struct.instance(TOPICS_KEY_NAME);
-            topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey());
-            List<Struct> partitionStructArray = new ArrayList<>();
-            for (Map.Entry<Integer, Errors> responsesByPartitionEntry : 
responsesByTopicEntry.getValue().entrySet()) {
-                Struct partitionStruct = 
topicStruct.instance(PARTITIONS_KEY_NAME);
-                Errors response = responsesByPartitionEntry.getValue();
-                partitionStruct.set(PARTITION_ID, 
responsesByPartitionEntry.getKey());
-                partitionStruct.set(ERROR_CODE, response.code());
-                partitionStructArray.add(partitionStruct);
-            }
-            topicStruct.set(PARTITIONS_KEY_NAME, 
partitionStructArray.toArray());
-            topicStructArray.add(topicStruct);
-        }
-        struct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
-        return struct;
+        return data.toStruct(version);
     }
 
     @Override
     public int throttleTimeMs() {
-        return throttleTimeMs;
-    }
-
-    public Map<TopicPartition, Errors> responses() {
-        return this.responses;
+        return data.throttleTimeMs();
     }
 
     @Override
     public Map<Errors, Integer> errorCounts() {
-        return errorCounts(responses.values());
+        Map<Errors, Integer> errorCounts = new HashMap<>();

Review comment:
       Could we add a unit test for this one?

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
##########
@@ -17,141 +17,84 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
 
 public class AlterReplicaLogDirsRequest extends AbstractRequest {
 
-    // request level key names
-    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
-
-    // log dir level key names
-    private static final String LOG_DIR_KEY_NAME = "log_dir";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level key names
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V0 = new Schema(
-            new Field("log_dirs", new ArrayOf(new Schema(
-                    new Field("log_dir", STRING, "The absolute log directory 
path."),
-                    new Field("topics", new ArrayOf(new Schema(
-                            TOPIC_NAME,
-                            new Field("partitions", new ArrayOf(INT32), "List 
of partition ids of the topic."))))))));
-
-    /**
-     * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
-     */
-    private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V1 = 
ALTER_REPLICA_LOG_DIRS_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0, 
ALTER_REPLICA_LOG_DIRS_REQUEST_V1};
-    }
-
-    private final Map<TopicPartition, String> partitionDirs;
+    private final AlterReplicaLogDirsRequestData data;
 
     public static class Builder extends 
AbstractRequest.Builder<AlterReplicaLogDirsRequest> {
-        private final Map<TopicPartition, String> partitionDirs;
+        private final AlterReplicaLogDirsRequestData data;
 
-        public Builder(Map<TopicPartition, String> partitionDirs) {
+        public Builder(AlterReplicaLogDirsRequestData data) {
             super(ApiKeys.ALTER_REPLICA_LOG_DIRS);
-            this.partitionDirs = partitionDirs;
+            this.data = data;
         }
 
         @Override
         public AlterReplicaLogDirsRequest build(short version) {
-            return new AlterReplicaLogDirsRequest(partitionDirs, version);
+            return new AlterReplicaLogDirsRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append("(type=AlterReplicaLogDirsRequest")
-                .append(", partitionDirs=")
-                .append(partitionDirs)
-                .append(")");
-            return builder.toString();
+            return data.toString();
         }
     }
 
     public AlterReplicaLogDirsRequest(Struct struct, short version) {
         super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version);
-        partitionDirs = new HashMap<>();
-        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
-            Struct logDirStruct = (Struct) logDirStructObj;
-            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
-            for (Object topicStructObj : 
logDirStruct.getArray(TOPICS_KEY_NAME)) {
-                Struct topicStruct = (Struct) topicStructObj;
-                String topic = topicStruct.get(TOPIC_NAME);
-                for (Object partitionObj : 
topicStruct.getArray(PARTITIONS_KEY_NAME)) {
-                    int partition = (Integer) partitionObj;
-                    partitionDirs.put(new TopicPartition(topic, partition), 
logDir);
-                }
-            }
-        }
+        this.data = new AlterReplicaLogDirsRequestData(struct, version);
     }
 
-    public AlterReplicaLogDirsRequest(Map<TopicPartition, String> 
partitionDirs, short version) {
+    public AlterReplicaLogDirsRequest(AlterReplicaLogDirsRequestData data, 
short version) {
         super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version);
-        this.partitionDirs = partitionDirs;
+        this.data = data;
     }
 
     @Override
     protected Struct toStruct() {
-        Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
-        for (Map.Entry<TopicPartition, String> entry: 
partitionDirs.entrySet()) {
-            if (!dirPartitions.containsKey(entry.getValue()))
-                dirPartitions.put(entry.getValue(), new ArrayList<>());
-            dirPartitions.get(entry.getValue()).add(entry.getKey());
-        }
+        return data.toStruct(version());
+    }
 
-        Struct struct = new 
Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.requestSchema(version()));
-        List<Struct> logDirStructArray = new ArrayList<>();
-        for (Map.Entry<String, List<TopicPartition>> logDirEntry: 
dirPartitions.entrySet()) {
-            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
-            logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
-
-            List<Struct> topicStructArray = new ArrayList<>();
-            for (Map.Entry<String, List<Integer>> topicEntry: 
CollectionUtils.groupPartitionsByTopic(logDirEntry.getValue()).entrySet()) {
-                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
-                topicStruct.set(TOPIC_NAME, topicEntry.getKey());
-                topicStruct.set(PARTITIONS_KEY_NAME, 
topicEntry.getValue().toArray());
-                topicStructArray.add(topicStruct);
-            }
-            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
-            logDirStructArray.add(logDirStruct);
-        }
-        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
-        return struct;
+    @Override
+    public AlterReplicaLogDirsResponse getErrorResponse(Throwable e) {
+        return (AlterReplicaLogDirsResponse) super.getErrorResponse(e);
     }
 
     @Override
-    public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) {
-        Map<TopicPartition, Errors> responseMap = new HashMap<>();
-        for (Map.Entry<TopicPartition, String> entry : 
partitionDirs.entrySet()) {
-            responseMap.put(entry.getKey(), Errors.forException(e));
-        }
-        return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap);
+    public AlterReplicaLogDirsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {

Review comment:
       Could we add a unit test for this one and the one bellow?

##########
File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
##########
@@ -1707,4 +1707,38 @@ class KafkaApisTest {
       0, 0, partitionStates.asJava, Seq(broker).asJava).build()
     metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest)
   }
+
+  @Test
+  def testAlterReplicaLogDirs(): Unit = {
+    val data = new AlterReplicaLogDirsRequestData()
+    val dir = new AlterReplicaLogDirsRequestData.AlterReplicaLogDir()
+      .setPath("/foo")
+    dir.topics().add(new 
AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setName("t0").setPartitions(asList(0,
 1, 2)))
+    data.dirs().add(dir)
+    val alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder(
+      data
+    ).build()
+    val request = buildRequest(alterReplicaLogDirsRequest)
+
+    EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel)
+
+    val capturedResponse = expectNoThrottling()
+    EasyMock.expect(replicaManager.alterReplicaLogDirs(EasyMock.eq(Map(
+      new TopicPartition("t0", 0) -> "/foo",
+      new TopicPartition("t0", 1) -> "/foo",
+      new TopicPartition("t0", 2) -> "/foo"))))
+      .andReturn(Map(
+        new TopicPartition("t0", 0) -> Errors.NONE,
+        new TopicPartition("t0", 1) -> Errors.LOG_DIR_NOT_FOUND,
+        new TopicPartition("t0", 2) -> Errors.INVALID_TOPIC_EXCEPTION))
+    EasyMock.replay(replicaManager, clientQuotaManager, 
clientRequestQuotaManager, requestChannel)
+
+    createKafkaApis().handleAlterReplicaLogDirsRequest(request)
+
+    val response = readResponse(ApiKeys.ALTER_REPLICA_LOG_DIRS, 
alterReplicaLogDirsRequest, capturedResponse)
+      .asInstanceOf[AlterReplicaLogDirsResponse]
+    assertEquals(Map(Errors.NONE -> 1,

Review comment:
       It is good to validate the `errorCounts` but could we also validate the 
overall mapping from TopicPartition to Errors?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception {
         }
     }
 
+    @Test
+    public void testAlterReplicaLogDirsSuccess() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.NONE, 0);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            result.values().get(tpr0).get();
+            result.values().get(tpr1).get();
+        }
+    }
+
+    @Test
+    public void testAlterReplicaLogDirsLogDirNotFound() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.LOG_DIR_NOT_FOUND, 
0);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            result.values().get(tpr0).get();
+            TestUtils.assertFutureError(result.values().get(tpr1), 
LogDirNotFoundException.class);
+        }
+    }
+
+    @Test
+    public void testAlterReplicaLogDirsUnrequested() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.NONE, 1, 2);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            // alterReplicaLogDirs() error handling fails all futures, but 
some of them may already be completed
+            // so we can't make a reliable assertion about 
result.values().get(tpr0)
+            TestUtils.assertFutureError(result.values().get(tpr1), 
IllegalStateException.class);
+        }
+    }
+
+    private void createAlterLogDirsResponse(AdminClientUnitTestEnv env, Node 
node, Errors error, int... partition) {

Review comment:
       nit: `partitions`?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception {
         }
     }
 
+    @Test
+    public void testAlterReplicaLogDirsSuccess() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.NONE, 0);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            result.values().get(tpr0).get();
+            result.values().get(tpr1).get();

Review comment:
       nit: We could use `assertNull` here and in the other tests when the 
future returns `null`.

##########
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java
##########
@@ -17,141 +17,84 @@
 
 package org.apache.kafka.common.requests;
 
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.protocol.ApiKeys;
-import org.apache.kafka.common.protocol.Errors;
-import org.apache.kafka.common.protocol.types.ArrayOf;
-import org.apache.kafka.common.protocol.types.Field;
-import org.apache.kafka.common.protocol.types.Schema;
-import org.apache.kafka.common.protocol.types.Struct;
-import org.apache.kafka.common.utils.CollectionUtils;
-
 import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME;
-import static org.apache.kafka.common.protocol.types.Type.INT32;
-import static org.apache.kafka.common.protocol.types.Type.STRING;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData;
+import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData;
+import 
org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.protocol.types.Struct;
 
 public class AlterReplicaLogDirsRequest extends AbstractRequest {
 
-    // request level key names
-    private static final String LOG_DIRS_KEY_NAME = "log_dirs";
-
-    // log dir level key names
-    private static final String LOG_DIR_KEY_NAME = "log_dir";
-    private static final String TOPICS_KEY_NAME = "topics";
-
-    // topic level key names
-    private static final String PARTITIONS_KEY_NAME = "partitions";
-
-    private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V0 = new Schema(
-            new Field("log_dirs", new ArrayOf(new Schema(
-                    new Field("log_dir", STRING, "The absolute log directory 
path."),
-                    new Field("topics", new ArrayOf(new Schema(
-                            TOPIC_NAME,
-                            new Field("partitions", new ArrayOf(INT32), "List 
of partition ids of the topic."))))))));
-
-    /**
-     * The version number is bumped to indicate that on quota violation 
brokers send out responses before throttling.
-     */
-    private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V1 = 
ALTER_REPLICA_LOG_DIRS_REQUEST_V0;
-
-    public static Schema[] schemaVersions() {
-        return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0, 
ALTER_REPLICA_LOG_DIRS_REQUEST_V1};
-    }
-
-    private final Map<TopicPartition, String> partitionDirs;
+    private final AlterReplicaLogDirsRequestData data;
 
     public static class Builder extends 
AbstractRequest.Builder<AlterReplicaLogDirsRequest> {
-        private final Map<TopicPartition, String> partitionDirs;
+        private final AlterReplicaLogDirsRequestData data;
 
-        public Builder(Map<TopicPartition, String> partitionDirs) {
+        public Builder(AlterReplicaLogDirsRequestData data) {
             super(ApiKeys.ALTER_REPLICA_LOG_DIRS);
-            this.partitionDirs = partitionDirs;
+            this.data = data;
         }
 
         @Override
         public AlterReplicaLogDirsRequest build(short version) {
-            return new AlterReplicaLogDirsRequest(partitionDirs, version);
+            return new AlterReplicaLogDirsRequest(data, version);
         }
 
         @Override
         public String toString() {
-            StringBuilder builder = new StringBuilder();
-            builder.append("(type=AlterReplicaLogDirsRequest")
-                .append(", partitionDirs=")
-                .append(partitionDirs)
-                .append(")");
-            return builder.toString();
+            return data.toString();
         }
     }
 
     public AlterReplicaLogDirsRequest(Struct struct, short version) {
         super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version);
-        partitionDirs = new HashMap<>();
-        for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) {
-            Struct logDirStruct = (Struct) logDirStructObj;
-            String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME);
-            for (Object topicStructObj : 
logDirStruct.getArray(TOPICS_KEY_NAME)) {
-                Struct topicStruct = (Struct) topicStructObj;
-                String topic = topicStruct.get(TOPIC_NAME);
-                for (Object partitionObj : 
topicStruct.getArray(PARTITIONS_KEY_NAME)) {
-                    int partition = (Integer) partitionObj;
-                    partitionDirs.put(new TopicPartition(topic, partition), 
logDir);
-                }
-            }
-        }
+        this.data = new AlterReplicaLogDirsRequestData(struct, version);
     }
 
-    public AlterReplicaLogDirsRequest(Map<TopicPartition, String> 
partitionDirs, short version) {
+    public AlterReplicaLogDirsRequest(AlterReplicaLogDirsRequestData data, 
short version) {
         super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version);
-        this.partitionDirs = partitionDirs;
+        this.data = data;
     }
 
     @Override
     protected Struct toStruct() {
-        Map<String, List<TopicPartition>> dirPartitions = new HashMap<>();
-        for (Map.Entry<TopicPartition, String> entry: 
partitionDirs.entrySet()) {
-            if (!dirPartitions.containsKey(entry.getValue()))
-                dirPartitions.put(entry.getValue(), new ArrayList<>());
-            dirPartitions.get(entry.getValue()).add(entry.getKey());
-        }
+        return data.toStruct(version());
+    }
 
-        Struct struct = new 
Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.requestSchema(version()));
-        List<Struct> logDirStructArray = new ArrayList<>();
-        for (Map.Entry<String, List<TopicPartition>> logDirEntry: 
dirPartitions.entrySet()) {
-            Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME);
-            logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey());
-
-            List<Struct> topicStructArray = new ArrayList<>();
-            for (Map.Entry<String, List<Integer>> topicEntry: 
CollectionUtils.groupPartitionsByTopic(logDirEntry.getValue()).entrySet()) {
-                Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME);
-                topicStruct.set(TOPIC_NAME, topicEntry.getKey());
-                topicStruct.set(PARTITIONS_KEY_NAME, 
topicEntry.getValue().toArray());
-                topicStructArray.add(topicStruct);
-            }
-            logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray());
-            logDirStructArray.add(logDirStruct);
-        }
-        struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray());
-        return struct;
+    @Override
+    public AlterReplicaLogDirsResponse getErrorResponse(Throwable e) {

Review comment:
       nit: This one is not used if not mistaken. Let's remove it.

##########
File path: 
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
##########
@@ -36,6 +37,13 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest 
{
 
   val topic = "topic"
 
+  private def findErrorForPartition(response: AlterReplicaLogDirsResponse, tp: 
TopicPartition): Errors = {
+    Errors.forCode(response.data.results.asScala.groupBy(x => 
x.topicName).get(tp.topic).get

Review comment:
       I find the implementation that you did in `AuthorizerIntegrationTest` at 
L195 easier to read. Could we use it here as well to stay consistent?

##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception {
         }
     }
 
+    @Test
+    public void testAlterReplicaLogDirsSuccess() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.NONE, 0);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            result.values().get(tpr0).get();
+            result.values().get(tpr1).get();
+        }
+    }
+
+    @Test
+    public void testAlterReplicaLogDirsLogDirNotFound() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.LOG_DIR_NOT_FOUND, 
0);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);
+
+            Map<TopicPartitionReplica, String> logDirs = new HashMap<>();
+            logDirs.put(tpr0, "/data0");
+            logDirs.put(tpr1, "/data1");
+            AlterReplicaLogDirsResult result = 
env.adminClient().alterReplicaLogDirs(logDirs);
+            result.values().get(tpr0).get();
+            TestUtils.assertFutureError(result.values().get(tpr1), 
LogDirNotFoundException.class);
+        }
+    }
+
+    @Test
+    public void testAlterReplicaLogDirsUnrequested() throws Exception {
+        Node node0 = new Node(0, "localhost", 8120);
+        Node node1 = new Node(1, "localhost", 8121);
+        try (AdminClientUnitTestEnv env = mockClientEnv()) {
+            createAlterLogDirsResponse(env, node0, Errors.NONE, 0);
+            createAlterLogDirsResponse(env, node1, Errors.NONE, 1, 2);
+
+            TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 
0);
+            TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 
1);

Review comment:
       Do you intentionally use a `TopicPartitionReplica` which is not provided 
in the response here? I guess that it is to ensure that its future is not 
completed, isn't it?

##########
File path: 
core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala
##########
@@ -104,13 +112,26 @@ class AlterReplicaLogDirsRequestTest extends 
BaseRequestTest {
     partitionDirs3.put(new TopicPartition(topic, 1), validDir3)
     partitionDirs3.put(new TopicPartition(topic, 2), offlineDir)
     val alterReplicaDirResponse3 = 
sendAlterReplicaLogDirsRequest(partitionDirs3.toMap)
-    assertEquals(Errors.LOG_DIR_NOT_FOUND, 
alterReplicaDirResponse3.responses().get(new TopicPartition(topic, 0)))
-    assertEquals(Errors.KAFKA_STORAGE_ERROR, 
alterReplicaDirResponse3.responses().get(new TopicPartition(topic, 1)))
-    assertEquals(Errors.KAFKA_STORAGE_ERROR, 
alterReplicaDirResponse3.responses().get(new TopicPartition(topic, 2)))
+    assertEquals(Errors.LOG_DIR_NOT_FOUND, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 0)))
+    assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 1)))
+    assertEquals(Errors.KAFKA_STORAGE_ERROR, 
findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2)))
   }
 
   private def sendAlterReplicaLogDirsRequest(partitionDirs: 
Map[TopicPartition, String]): AlterReplicaLogDirsResponse = {
-    val request = new 
AlterReplicaLogDirsRequest.Builder(partitionDirs.asJava).build()
+    val logDirs = partitionDirs.groupBy{case (_, dir) => dir}.map{ case(dir, 
tps) =>
+      new AlterReplicaLogDirsRequestData.AlterReplicaLogDir()
+        .setPath(dir)
+        .setTopics(new 
AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection(
+          tps.groupBy { case (tp, _) => tp.topic }
+            .map { case (topic, tpPartitions) =>
+          new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic()
+            .setName(topic)
+            .setPartitions(tpPartitions.map{case (tp, _) => 
tp.partition.asInstanceOf[Integer]}.toList.asJava)
+        }.toList.asJava.iterator))

Review comment:
       nit: Indentation looks wrong here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to