dajac commented on a change in pull request #8311: URL: https://github.com/apache/kafka/pull/8311#discussion_r412287554
########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, for (TopicPartitionReplica replica : replicaAssignment.keySet()) futures.put(replica, new KafkaFutureImpl<>()); - Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = new HashMap<>(); + Map<Integer, AlterReplicaLogDirsRequestData> replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry<TopicPartitionReplica, String> entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); Review comment: `topicPartition` is not used except for getting the topic and the partition above. `replica.topic()` and `replica.partition()` could be directly used instead. ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, for (TopicPartitionReplica replica : replicaAssignment.keySet()) futures.put(replica, new KafkaFutureImpl<>()); - Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = new HashMap<>(); + Map<Integer, AlterReplicaLogDirsRequestData> replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry<TopicPartitionReplica, String> entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); - if (!replicaAssignmentByBroker.containsKey(brokerId)) - replicaAssignmentByBroker.put(brokerId, new HashMap<>()); - replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir); + AlterReplicaLogDirsRequestData value = replicaAssignmentByBroker.computeIfAbsent(brokerId, Review comment: nit: Could we rename `value` to something like `alterReplicaLogDirs`? ########## File path: clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java ########## @@ -2207,21 +2212,33 @@ public AlterReplicaLogDirsResult alterReplicaLogDirs(Map<TopicPartitionReplica, for (TopicPartitionReplica replica : replicaAssignment.keySet()) futures.put(replica, new KafkaFutureImpl<>()); - Map<Integer, Map<TopicPartition, String>> replicaAssignmentByBroker = new HashMap<>(); + Map<Integer, AlterReplicaLogDirsRequestData> replicaAssignmentByBroker = new HashMap<>(); for (Map.Entry<TopicPartitionReplica, String> entry: replicaAssignment.entrySet()) { TopicPartitionReplica replica = entry.getKey(); String logDir = entry.getValue(); int brokerId = replica.brokerId(); TopicPartition topicPartition = new TopicPartition(replica.topic(), replica.partition()); - if (!replicaAssignmentByBroker.containsKey(brokerId)) - replicaAssignmentByBroker.put(brokerId, new HashMap<>()); - replicaAssignmentByBroker.get(brokerId).put(topicPartition, logDir); + AlterReplicaLogDirsRequestData value = replicaAssignmentByBroker.computeIfAbsent(brokerId, + key -> new AlterReplicaLogDirsRequestData()); + AlterReplicaLogDir alterReplicaLogDir = value.dirs().find(logDir); + if (alterReplicaLogDir == null) { + alterReplicaLogDir = new AlterReplicaLogDir(); + alterReplicaLogDir.setPath(logDir); + value.dirs().add(alterReplicaLogDir); + } + AlterReplicaLogDirTopic alterReplicaLogDirTopic = alterReplicaLogDir.topics().find(topicPartition.topic()); + if (alterReplicaLogDirTopic == null) { + alterReplicaLogDirTopic = new AlterReplicaLogDirTopic(); + alterReplicaLogDir.topics().add(alterReplicaLogDirTopic); + } + alterReplicaLogDirTopic.setName(topicPartition.topic()) Review comment: `setName` could be done only once within the if statement. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java ########## @@ -17,122 +17,53 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; - import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; public class AlterReplicaLogDirsResponse extends AbstractResponse { - // request level key names - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; + private final AlterReplicaLogDirsResponseData data; - private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V0 = new Schema( - THROTTLE_TIME_MS, - new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - ERROR_CODE))))))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V1 = ALTER_REPLICA_LOG_DIRS_RESPONSE_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0, ALTER_REPLICA_LOG_DIRS_RESPONSE_V1}; + public AlterReplicaLogDirsResponse(Struct struct) { + this(struct, ApiKeys.ALTER_REPLICA_LOG_DIRS.latestVersion()); } - /** - * Possible error code: - * - * LOG_DIR_NOT_FOUND (57) - * KAFKA_STORAGE_ERROR (56) - * REPLICA_NOT_AVAILABLE (9) - * UNKNOWN (-1) Review comment: Could we keep those and add them in the javadoc of `AlterReplicaLogDirsResponse` like we did [here](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/OffsetDeleteResponse.java#L31)? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception { } } + @Test + public void testAlterReplicaLogDirsSuccess() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { Review comment: nit: The mocked environment creates 3 nodes (by default) that you can use so you don't have to create them. You can get them with `env.getCluster().nodeById(..)`. ########## File path: clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java ########## @@ -2198,4 +2203,31 @@ private OffsetDeleteResponse createOffsetDeleteResponse() { return new OffsetDeleteResponse(data); } + private AlterReplicaLogDirsRequest createAlterReplicaLogDirsRequest() { + AlterReplicaLogDirsRequestData data = new AlterReplicaLogDirsRequestData(); + data.dirs().add( + new AlterReplicaLogDirsRequestData.AlterReplicaLogDir().setPath("/data0").setTopics( + new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection(Collections.singletonList( + new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setPartitions(singletonList(0)).setName("topic") + ).iterator()) + ) Review comment: nit: Could we break these long lines like you did bellow already? ########## File path: core/src/main/scala/kafka/server/KafkaApis.scala ########## @@ -2589,15 +2589,27 @@ class KafkaApis(val requestChannel: RequestChannel, new DescribeConfigsResponse(requestThrottleMs, (authorizedConfigs ++ unauthorizedConfigs).asJava)) } + Review comment: nit: this extra empty line can be removed. ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -1707,4 +1707,38 @@ class KafkaApisTest { 0, 0, partitionStates.asJava, Seq(broker).asJava).build() metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest) } + + @Test + def testAlterReplicaLogDirs(): Unit = { + val data = new AlterReplicaLogDirsRequestData() + val dir = new AlterReplicaLogDirsRequestData.AlterReplicaLogDir() + .setPath("/foo") + dir.topics().add(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setName("t0").setPartitions(asList(0, 1, 2))) + data.dirs().add(dir) + val alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder( + data + ).build() + val request = buildRequest(alterReplicaLogDirsRequest) + + EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel) + + val capturedResponse = expectNoThrottling() + EasyMock.expect(replicaManager.alterReplicaLogDirs(EasyMock.eq(Map( + new TopicPartition("t0", 0) -> "/foo", + new TopicPartition("t0", 1) -> "/foo", + new TopicPartition("t0", 2) -> "/foo")))) + .andReturn(Map( + new TopicPartition("t0", 0) -> Errors.NONE, + new TopicPartition("t0", 1) -> Errors.LOG_DIR_NOT_FOUND, + new TopicPartition("t0", 2) -> Errors.INVALID_TOPIC_EXCEPTION)) Review comment: nit: Indentation seems inconsistent. ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception { } } + @Test + public void testAlterReplicaLogDirsSuccess() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.NONE, 0); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + result.values().get(tpr0).get(); + result.values().get(tpr1).get(); + } + } + + @Test + public void testAlterReplicaLogDirsLogDirNotFound() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.LOG_DIR_NOT_FOUND, 0); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + result.values().get(tpr0).get(); + TestUtils.assertFutureError(result.values().get(tpr1), LogDirNotFoundException.class); + } + } + + @Test + public void testAlterReplicaLogDirsUnrequested() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.NONE, 1, 2); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + // alterReplicaLogDirs() error handling fails all futures, but some of them may already be completed Review comment: Couldn't we just remove `tpr0` from the test then? It does not seem to bring much. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsResponse.java ########## @@ -17,122 +17,53 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; - import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; -import static org.apache.kafka.common.protocol.CommonFields.ERROR_CODE; -import static org.apache.kafka.common.protocol.CommonFields.PARTITION_ID; -import static org.apache.kafka.common.protocol.CommonFields.THROTTLE_TIME_MS; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; public class AlterReplicaLogDirsResponse extends AbstractResponse { - // request level key names - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; + private final AlterReplicaLogDirsResponseData data; - private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V0 = new Schema( - THROTTLE_TIME_MS, - new Field(TOPICS_KEY_NAME, new ArrayOf(new Schema( - TOPIC_NAME, - new Field(PARTITIONS_KEY_NAME, new ArrayOf(new Schema( - PARTITION_ID, - ERROR_CODE))))))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ALTER_REPLICA_LOG_DIRS_RESPONSE_V1 = ALTER_REPLICA_LOG_DIRS_RESPONSE_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{ALTER_REPLICA_LOG_DIRS_RESPONSE_V0, ALTER_REPLICA_LOG_DIRS_RESPONSE_V1}; + public AlterReplicaLogDirsResponse(Struct struct) { + this(struct, ApiKeys.ALTER_REPLICA_LOG_DIRS.latestVersion()); } - /** - * Possible error code: - * - * LOG_DIR_NOT_FOUND (57) - * KAFKA_STORAGE_ERROR (56) - * REPLICA_NOT_AVAILABLE (9) - * UNKNOWN (-1) - */ - private final Map<TopicPartition, Errors> responses; - private final int throttleTimeMs; + public AlterReplicaLogDirsResponse(Struct struct, short version) { + this.data = new AlterReplicaLogDirsResponseData(struct, version); + } - public AlterReplicaLogDirsResponse(Struct struct) { - throttleTimeMs = struct.get(THROTTLE_TIME_MS); - responses = new HashMap<>(); - for (Object topicStructObj : struct.getArray(TOPICS_KEY_NAME)) { - Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.get(TOPIC_NAME); - for (Object partitionStructObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { - Struct partitionStruct = (Struct) partitionStructObj; - int partition = partitionStruct.get(PARTITION_ID); - Errors error = Errors.forCode(partitionStruct.get(ERROR_CODE)); - responses.put(new TopicPartition(topic, partition), error); - } - } + public AlterReplicaLogDirsResponse(AlterReplicaLogDirsResponseData data) { + this.data = data; } - /** - * Constructor for version 0. - */ - public AlterReplicaLogDirsResponse(int throttleTimeMs, Map<TopicPartition, Errors> responses) { - this.throttleTimeMs = throttleTimeMs; - this.responses = responses; + public AlterReplicaLogDirsResponseData data() { + return data; } @Override protected Struct toStruct(short version) { - Struct struct = new Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.responseSchema(version)); - struct.set(THROTTLE_TIME_MS, throttleTimeMs); - Map<String, Map<Integer, Errors>> responsesByTopic = CollectionUtils.groupPartitionDataByTopic(responses); - List<Struct> topicStructArray = new ArrayList<>(); - for (Map.Entry<String, Map<Integer, Errors>> responsesByTopicEntry : responsesByTopic.entrySet()) { - Struct topicStruct = struct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_NAME, responsesByTopicEntry.getKey()); - List<Struct> partitionStructArray = new ArrayList<>(); - for (Map.Entry<Integer, Errors> responsesByPartitionEntry : responsesByTopicEntry.getValue().entrySet()) { - Struct partitionStruct = topicStruct.instance(PARTITIONS_KEY_NAME); - Errors response = responsesByPartitionEntry.getValue(); - partitionStruct.set(PARTITION_ID, responsesByPartitionEntry.getKey()); - partitionStruct.set(ERROR_CODE, response.code()); - partitionStructArray.add(partitionStruct); - } - topicStruct.set(PARTITIONS_KEY_NAME, partitionStructArray.toArray()); - topicStructArray.add(topicStruct); - } - struct.set(TOPICS_KEY_NAME, topicStructArray.toArray()); - return struct; + return data.toStruct(version); } @Override public int throttleTimeMs() { - return throttleTimeMs; - } - - public Map<TopicPartition, Errors> responses() { - return this.responses; + return data.throttleTimeMs(); } @Override public Map<Errors, Integer> errorCounts() { - return errorCounts(responses.values()); + Map<Errors, Integer> errorCounts = new HashMap<>(); Review comment: Could we add a unit test for this one? ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java ########## @@ -17,141 +17,84 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; - import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.stream.Collectors; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; public class AlterReplicaLogDirsRequest extends AbstractRequest { - // request level key names - private static final String LOG_DIRS_KEY_NAME = "log_dirs"; - - // log dir level key names - private static final String LOG_DIR_KEY_NAME = "log_dir"; - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; - - private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V0 = new Schema( - new Field("log_dirs", new ArrayOf(new Schema( - new Field("log_dir", STRING, "The absolute log directory path."), - new Field("topics", new ArrayOf(new Schema( - TOPIC_NAME, - new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")))))))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V1 = ALTER_REPLICA_LOG_DIRS_REQUEST_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0, ALTER_REPLICA_LOG_DIRS_REQUEST_V1}; - } - - private final Map<TopicPartition, String> partitionDirs; + private final AlterReplicaLogDirsRequestData data; public static class Builder extends AbstractRequest.Builder<AlterReplicaLogDirsRequest> { - private final Map<TopicPartition, String> partitionDirs; + private final AlterReplicaLogDirsRequestData data; - public Builder(Map<TopicPartition, String> partitionDirs) { + public Builder(AlterReplicaLogDirsRequestData data) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS); - this.partitionDirs = partitionDirs; + this.data = data; } @Override public AlterReplicaLogDirsRequest build(short version) { - return new AlterReplicaLogDirsRequest(partitionDirs, version); + return new AlterReplicaLogDirsRequest(data, version); } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("(type=AlterReplicaLogDirsRequest") - .append(", partitionDirs=") - .append(partitionDirs) - .append(")"); - return builder.toString(); + return data.toString(); } } public AlterReplicaLogDirsRequest(Struct struct, short version) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version); - partitionDirs = new HashMap<>(); - for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) { - Struct logDirStruct = (Struct) logDirStructObj; - String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME); - for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) { - Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.get(TOPIC_NAME); - for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { - int partition = (Integer) partitionObj; - partitionDirs.put(new TopicPartition(topic, partition), logDir); - } - } - } + this.data = new AlterReplicaLogDirsRequestData(struct, version); } - public AlterReplicaLogDirsRequest(Map<TopicPartition, String> partitionDirs, short version) { + public AlterReplicaLogDirsRequest(AlterReplicaLogDirsRequestData data, short version) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version); - this.partitionDirs = partitionDirs; + this.data = data; } @Override protected Struct toStruct() { - Map<String, List<TopicPartition>> dirPartitions = new HashMap<>(); - for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) { - if (!dirPartitions.containsKey(entry.getValue())) - dirPartitions.put(entry.getValue(), new ArrayList<>()); - dirPartitions.get(entry.getValue()).add(entry.getKey()); - } + return data.toStruct(version()); + } - Struct struct = new Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.requestSchema(version())); - List<Struct> logDirStructArray = new ArrayList<>(); - for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) { - Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME); - logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey()); - - List<Struct> topicStructArray = new ArrayList<>(); - for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupPartitionsByTopic(logDirEntry.getValue()).entrySet()) { - Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_NAME, topicEntry.getKey()); - topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); - topicStructArray.add(topicStruct); - } - logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray()); - logDirStructArray.add(logDirStruct); - } - struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray()); - return struct; + @Override + public AlterReplicaLogDirsResponse getErrorResponse(Throwable e) { + return (AlterReplicaLogDirsResponse) super.getErrorResponse(e); } @Override - public AbstractResponse getErrorResponse(int throttleTimeMs, Throwable e) { - Map<TopicPartition, Errors> responseMap = new HashMap<>(); - for (Map.Entry<TopicPartition, String> entry : partitionDirs.entrySet()) { - responseMap.put(entry.getKey(), Errors.forException(e)); - } - return new AlterReplicaLogDirsResponse(throttleTimeMs, responseMap); + public AlterReplicaLogDirsResponse getErrorResponse(int throttleTimeMs, Throwable e) { Review comment: Could we add a unit test for this one and the one bellow? ########## File path: core/src/test/scala/unit/kafka/server/KafkaApisTest.scala ########## @@ -1707,4 +1707,38 @@ class KafkaApisTest { 0, 0, partitionStates.asJava, Seq(broker).asJava).build() metadataCache.updateMetadata(correlationId = 0, updateMetadataRequest) } + + @Test + def testAlterReplicaLogDirs(): Unit = { + val data = new AlterReplicaLogDirsRequestData() + val dir = new AlterReplicaLogDirsRequestData.AlterReplicaLogDir() + .setPath("/foo") + dir.topics().add(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic().setName("t0").setPartitions(asList(0, 1, 2))) + data.dirs().add(dir) + val alterReplicaLogDirsRequest = new AlterReplicaLogDirsRequest.Builder( + data + ).build() + val request = buildRequest(alterReplicaLogDirsRequest) + + EasyMock.reset(replicaManager, clientRequestQuotaManager, requestChannel) + + val capturedResponse = expectNoThrottling() + EasyMock.expect(replicaManager.alterReplicaLogDirs(EasyMock.eq(Map( + new TopicPartition("t0", 0) -> "/foo", + new TopicPartition("t0", 1) -> "/foo", + new TopicPartition("t0", 2) -> "/foo")))) + .andReturn(Map( + new TopicPartition("t0", 0) -> Errors.NONE, + new TopicPartition("t0", 1) -> Errors.LOG_DIR_NOT_FOUND, + new TopicPartition("t0", 2) -> Errors.INVALID_TOPIC_EXCEPTION)) + EasyMock.replay(replicaManager, clientQuotaManager, clientRequestQuotaManager, requestChannel) + + createKafkaApis().handleAlterReplicaLogDirsRequest(request) + + val response = readResponse(ApiKeys.ALTER_REPLICA_LOG_DIRS, alterReplicaLogDirsRequest, capturedResponse) + .asInstanceOf[AlterReplicaLogDirsResponse] + assertEquals(Map(Errors.NONE -> 1, Review comment: It is good to validate the `errorCounts` but could we also validate the overall mapping from TopicPartition to Errors? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception { } } + @Test + public void testAlterReplicaLogDirsSuccess() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.NONE, 0); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + result.values().get(tpr0).get(); + result.values().get(tpr1).get(); + } + } + + @Test + public void testAlterReplicaLogDirsLogDirNotFound() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.LOG_DIR_NOT_FOUND, 0); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + result.values().get(tpr0).get(); + TestUtils.assertFutureError(result.values().get(tpr1), LogDirNotFoundException.class); + } + } + + @Test + public void testAlterReplicaLogDirsUnrequested() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.NONE, 1, 2); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + // alterReplicaLogDirs() error handling fails all futures, but some of them may already be completed + // so we can't make a reliable assertion about result.values().get(tpr0) + TestUtils.assertFutureError(result.values().get(tpr1), IllegalStateException.class); + } + } + + private void createAlterLogDirsResponse(AdminClientUnitTestEnv env, Node node, Errors error, int... partition) { Review comment: nit: `partitions`? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception { } } + @Test + public void testAlterReplicaLogDirsSuccess() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.NONE, 0); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + result.values().get(tpr0).get(); + result.values().get(tpr1).get(); Review comment: nit: We could use `assertNull` here and in the other tests when the future returns `null`. ########## File path: clients/src/main/java/org/apache/kafka/common/requests/AlterReplicaLogDirsRequest.java ########## @@ -17,141 +17,84 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; -import org.apache.kafka.common.protocol.types.ArrayOf; -import org.apache.kafka.common.protocol.types.Field; -import org.apache.kafka.common.protocol.types.Schema; -import org.apache.kafka.common.protocol.types.Struct; -import org.apache.kafka.common.utils.CollectionUtils; - import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.stream.Collectors; -import static org.apache.kafka.common.protocol.CommonFields.TOPIC_NAME; -import static org.apache.kafka.common.protocol.types.Type.INT32; -import static org.apache.kafka.common.protocol.types.Type.STRING; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.message.AlterReplicaLogDirsRequestData; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData; +import org.apache.kafka.common.message.AlterReplicaLogDirsResponseData.AlterReplicaLogDirTopicResult; +import org.apache.kafka.common.protocol.ApiKeys; +import org.apache.kafka.common.protocol.Errors; +import org.apache.kafka.common.protocol.types.Struct; public class AlterReplicaLogDirsRequest extends AbstractRequest { - // request level key names - private static final String LOG_DIRS_KEY_NAME = "log_dirs"; - - // log dir level key names - private static final String LOG_DIR_KEY_NAME = "log_dir"; - private static final String TOPICS_KEY_NAME = "topics"; - - // topic level key names - private static final String PARTITIONS_KEY_NAME = "partitions"; - - private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V0 = new Schema( - new Field("log_dirs", new ArrayOf(new Schema( - new Field("log_dir", STRING, "The absolute log directory path."), - new Field("topics", new ArrayOf(new Schema( - TOPIC_NAME, - new Field("partitions", new ArrayOf(INT32), "List of partition ids of the topic.")))))))); - - /** - * The version number is bumped to indicate that on quota violation brokers send out responses before throttling. - */ - private static final Schema ALTER_REPLICA_LOG_DIRS_REQUEST_V1 = ALTER_REPLICA_LOG_DIRS_REQUEST_V0; - - public static Schema[] schemaVersions() { - return new Schema[]{ALTER_REPLICA_LOG_DIRS_REQUEST_V0, ALTER_REPLICA_LOG_DIRS_REQUEST_V1}; - } - - private final Map<TopicPartition, String> partitionDirs; + private final AlterReplicaLogDirsRequestData data; public static class Builder extends AbstractRequest.Builder<AlterReplicaLogDirsRequest> { - private final Map<TopicPartition, String> partitionDirs; + private final AlterReplicaLogDirsRequestData data; - public Builder(Map<TopicPartition, String> partitionDirs) { + public Builder(AlterReplicaLogDirsRequestData data) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS); - this.partitionDirs = partitionDirs; + this.data = data; } @Override public AlterReplicaLogDirsRequest build(short version) { - return new AlterReplicaLogDirsRequest(partitionDirs, version); + return new AlterReplicaLogDirsRequest(data, version); } @Override public String toString() { - StringBuilder builder = new StringBuilder(); - builder.append("(type=AlterReplicaLogDirsRequest") - .append(", partitionDirs=") - .append(partitionDirs) - .append(")"); - return builder.toString(); + return data.toString(); } } public AlterReplicaLogDirsRequest(Struct struct, short version) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version); - partitionDirs = new HashMap<>(); - for (Object logDirStructObj : struct.getArray(LOG_DIRS_KEY_NAME)) { - Struct logDirStruct = (Struct) logDirStructObj; - String logDir = logDirStruct.getString(LOG_DIR_KEY_NAME); - for (Object topicStructObj : logDirStruct.getArray(TOPICS_KEY_NAME)) { - Struct topicStruct = (Struct) topicStructObj; - String topic = topicStruct.get(TOPIC_NAME); - for (Object partitionObj : topicStruct.getArray(PARTITIONS_KEY_NAME)) { - int partition = (Integer) partitionObj; - partitionDirs.put(new TopicPartition(topic, partition), logDir); - } - } - } + this.data = new AlterReplicaLogDirsRequestData(struct, version); } - public AlterReplicaLogDirsRequest(Map<TopicPartition, String> partitionDirs, short version) { + public AlterReplicaLogDirsRequest(AlterReplicaLogDirsRequestData data, short version) { super(ApiKeys.ALTER_REPLICA_LOG_DIRS, version); - this.partitionDirs = partitionDirs; + this.data = data; } @Override protected Struct toStruct() { - Map<String, List<TopicPartition>> dirPartitions = new HashMap<>(); - for (Map.Entry<TopicPartition, String> entry: partitionDirs.entrySet()) { - if (!dirPartitions.containsKey(entry.getValue())) - dirPartitions.put(entry.getValue(), new ArrayList<>()); - dirPartitions.get(entry.getValue()).add(entry.getKey()); - } + return data.toStruct(version()); + } - Struct struct = new Struct(ApiKeys.ALTER_REPLICA_LOG_DIRS.requestSchema(version())); - List<Struct> logDirStructArray = new ArrayList<>(); - for (Map.Entry<String, List<TopicPartition>> logDirEntry: dirPartitions.entrySet()) { - Struct logDirStruct = struct.instance(LOG_DIRS_KEY_NAME); - logDirStruct.set(LOG_DIR_KEY_NAME, logDirEntry.getKey()); - - List<Struct> topicStructArray = new ArrayList<>(); - for (Map.Entry<String, List<Integer>> topicEntry: CollectionUtils.groupPartitionsByTopic(logDirEntry.getValue()).entrySet()) { - Struct topicStruct = logDirStruct.instance(TOPICS_KEY_NAME); - topicStruct.set(TOPIC_NAME, topicEntry.getKey()); - topicStruct.set(PARTITIONS_KEY_NAME, topicEntry.getValue().toArray()); - topicStructArray.add(topicStruct); - } - logDirStruct.set(TOPICS_KEY_NAME, topicStructArray.toArray()); - logDirStructArray.add(logDirStruct); - } - struct.set(LOG_DIRS_KEY_NAME, logDirStructArray.toArray()); - return struct; + @Override + public AlterReplicaLogDirsResponse getErrorResponse(Throwable e) { Review comment: nit: This one is not used if not mistaken. Let's remove it. ########## File path: core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala ########## @@ -36,6 +37,13 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { val topic = "topic" + private def findErrorForPartition(response: AlterReplicaLogDirsResponse, tp: TopicPartition): Errors = { + Errors.forCode(response.data.results.asScala.groupBy(x => x.topicName).get(tp.topic).get Review comment: I find the implementation that you did in `AuthorizerIntegrationTest` at L195 easier to read. Could we use it here as well to stay consistent? ########## File path: clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java ########## @@ -3382,6 +3388,78 @@ public void testAlterClientQuotas() throws Exception { } } + @Test + public void testAlterReplicaLogDirsSuccess() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.NONE, 0); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + result.values().get(tpr0).get(); + result.values().get(tpr1).get(); + } + } + + @Test + public void testAlterReplicaLogDirsLogDirNotFound() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.LOG_DIR_NOT_FOUND, 0); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); + + Map<TopicPartitionReplica, String> logDirs = new HashMap<>(); + logDirs.put(tpr0, "/data0"); + logDirs.put(tpr1, "/data1"); + AlterReplicaLogDirsResult result = env.adminClient().alterReplicaLogDirs(logDirs); + result.values().get(tpr0).get(); + TestUtils.assertFutureError(result.values().get(tpr1), LogDirNotFoundException.class); + } + } + + @Test + public void testAlterReplicaLogDirsUnrequested() throws Exception { + Node node0 = new Node(0, "localhost", 8120); + Node node1 = new Node(1, "localhost", 8121); + try (AdminClientUnitTestEnv env = mockClientEnv()) { + createAlterLogDirsResponse(env, node0, Errors.NONE, 0); + createAlterLogDirsResponse(env, node1, Errors.NONE, 1, 2); + + TopicPartitionReplica tpr0 = new TopicPartitionReplica("topic", 0, 0); + TopicPartitionReplica tpr1 = new TopicPartitionReplica("topic", 0, 1); Review comment: Do you intentionally use a `TopicPartitionReplica` which is not provided in the response here? I guess that it is to ensure that its future is not completed, isn't it? ########## File path: core/src/test/scala/unit/kafka/server/AlterReplicaLogDirsRequestTest.scala ########## @@ -104,13 +112,26 @@ class AlterReplicaLogDirsRequestTest extends BaseRequestTest { partitionDirs3.put(new TopicPartition(topic, 1), validDir3) partitionDirs3.put(new TopicPartition(topic, 2), offlineDir) val alterReplicaDirResponse3 = sendAlterReplicaLogDirsRequest(partitionDirs3.toMap) - assertEquals(Errors.LOG_DIR_NOT_FOUND, alterReplicaDirResponse3.responses().get(new TopicPartition(topic, 0))) - assertEquals(Errors.KAFKA_STORAGE_ERROR, alterReplicaDirResponse3.responses().get(new TopicPartition(topic, 1))) - assertEquals(Errors.KAFKA_STORAGE_ERROR, alterReplicaDirResponse3.responses().get(new TopicPartition(topic, 2))) + assertEquals(Errors.LOG_DIR_NOT_FOUND, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 0))) + assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 1))) + assertEquals(Errors.KAFKA_STORAGE_ERROR, findErrorForPartition(alterReplicaDirResponse3, new TopicPartition(topic, 2))) } private def sendAlterReplicaLogDirsRequest(partitionDirs: Map[TopicPartition, String]): AlterReplicaLogDirsResponse = { - val request = new AlterReplicaLogDirsRequest.Builder(partitionDirs.asJava).build() + val logDirs = partitionDirs.groupBy{case (_, dir) => dir}.map{ case(dir, tps) => + new AlterReplicaLogDirsRequestData.AlterReplicaLogDir() + .setPath(dir) + .setTopics(new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopicCollection( + tps.groupBy { case (tp, _) => tp.topic } + .map { case (topic, tpPartitions) => + new AlterReplicaLogDirsRequestData.AlterReplicaLogDirTopic() + .setName(topic) + .setPartitions(tpPartitions.map{case (tp, _) => tp.partition.asInstanceOf[Integer]}.toList.asJava) + }.toList.asJava.iterator)) Review comment: nit: Indentation looks wrong here. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org