clolov commented on code in PR #18983: URL: https://github.com/apache/kafka/pull/18983#discussion_r1965316204
########## clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java: ########## @@ -28,4 +28,25 @@ */ @InterfaceStability.Evolving public class AlterPartitionReassignmentsOptions extends AbstractOptions<AlterPartitionReassignmentsOptions> { + + private Boolean allowReplicationFactorChange = true; + + /** + * Set the option indicating if the alter partition reassignments call should be + * allowed to alter the replication factor of a partition. + * In cases where it is not allowed, any replication factor change will result in an exception thrown by the API. + */ + public boolean allowReplicationFactorChange(boolean allow) { + this.allowReplicationFactorChange = allow; + return allowReplicationFactorChange; Review Comment: On a quick cross-check with other *Options it appears that for such methods we return the *Options class itself. In other words, something along these lines ``` public AlterPartitionReassignmentsOptions allowReplicationFactorChange(boolean allow) { this.allowReplicationFactorChange = allow; return this; } ``` ########## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ########## @@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) { assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) + public void testAlterPartitionDisallowReplicationFactorChange(short version) { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3); + ctx.unfenceBrokers(0, 1, 2, 3); + ctx.createTestTopic("foo", new int[][] {new int[] {1, 2, 3}, new int[] {1, 2, 3}, new int[] {1, 2, 3}}); + + ControllerResult<AlterPartitionReassignmentsResponseData> alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(asList(0, 1, 2)), + new ReassignablePartition().setPartitionIndex(1). + setReplicas(asList(0, 1)), + new ReassignablePartition().setPartitionIndex(2). + setReplicas(asList(0, 1, 2, 3)))))). + setAllowReplicationFactorChange(false)); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(1). + setErrorCode(INVALID_REPLICATION_FACTOR.code()). + setErrorMessage("The replication factor is changed from 3 to 2"), + new ReassignablePartitionResponse().setPartitionIndex(2). + setErrorCode(INVALID_REPLICATION_FACTOR.code()). + setErrorMessage("The replication factor is changed from 3 to 4"))))), + alterResult.response()); + ctx.replay(alterResult.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(singletonList(new OngoingTopicReassignment(). + setName("foo").setPartitions(singletonList( + new OngoingPartitionReassignment().setPartitionIndex(0). + setRemovingReplicas(singletonList(3)). Review Comment: Nit: Partition 0 started with replicas (0, 1, 2). For the sake of continuity, could you remove 0 and add 3 rather than the other way around? ########## metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java: ########## @@ -2406,6 +2414,31 @@ private void updatePartitionInfo( newPartInfo.elr); } + private void validatePartitionReplicationFactorUnchanged( + PartitionRegistration part, Review Comment: Nit: Could you align the first argument with the opening bracket as other methods in this file? ########## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ########## @@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) { assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) + public void testAlterPartitionDisallowReplicationFactorChange(short version) { Review Comment: Maybe there already are such tests, in which case please point them out to me, but if not, could you add the same tests but with `setAllowReplicationFactorChange(true)`? ########## metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java: ########## @@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) { assertEquals(NONE_REASSIGNING, replication.listPartitionReassignments(null, Long.MAX_VALUE)); } + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) + public void testAlterPartitionDisallowReplicationFactorChange(short version) { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3); + ctx.unfenceBrokers(0, 1, 2, 3); + ctx.createTestTopic("foo", new int[][] {new int[] {1, 2, 3}, new int[] {1, 2, 3}, new int[] {1, 2, 3}}); + + ControllerResult<AlterPartitionReassignmentsResponseData> alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(asList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(asList(0, 1, 2)), + new ReassignablePartition().setPartitionIndex(1). + setReplicas(asList(0, 1)), + new ReassignablePartition().setPartitionIndex(2). + setReplicas(asList(0, 1, 2, 3)))))). + setAllowReplicationFactorChange(false)); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(asList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null), + new ReassignablePartitionResponse().setPartitionIndex(1). + setErrorCode(INVALID_REPLICATION_FACTOR.code()). + setErrorMessage("The replication factor is changed from 3 to 2"), + new ReassignablePartitionResponse().setPartitionIndex(2). + setErrorCode(INVALID_REPLICATION_FACTOR.code()). + setErrorMessage("The replication factor is changed from 3 to 4"))))), + alterResult.response()); + ctx.replay(alterResult.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(singletonList(new OngoingTopicReassignment(). + setName("foo").setPartitions(singletonList( + new OngoingPartitionReassignment().setPartitionIndex(0). + setRemovingReplicas(singletonList(3)). + setAddingReplicas(singletonList(0)). + setReplicas(asList(0, 1, 2, 3)))))); + assertEquals(currentReassigning, replication.listPartitionReassignments(singletonList( + new ListPartitionReassignmentsTopics().setName("foo"). + setPartitionIndexes(asList(0, 1, 2))), Long.MAX_VALUE)); + + // test alter replica factor not allow to change when partition reassignment is ongoing + ControllerResult<AlterPartitionReassignmentsResponseData> alterReassigningResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(singletonList( + new ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 1)))))). + setAllowReplicationFactorChange(false)); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(singletonList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorCode(INVALID_REPLICATION_FACTOR.code()). + setErrorMessage("The replication factor is changed from 3 to 2"))))), + alterReassigningResult.response()); + + ControllerResult<AlterPartitionReassignmentsResponseData> alterReassigningResult2 = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(singletonList( + new ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 2, 3)))))). + setAllowReplicationFactorChange(false)); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(singletonList( + new ReassignablePartitionResponse().setPartitionIndex(0). + setErrorMessage(null))))), + alterReassigningResult2.response()); + } + + @ParameterizedTest + @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION) + public void testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition(short version) { + MetadataVersion metadataVersion = MetadataVersion.latestTesting(); + ReplicationControlTestContext ctx = new ReplicationControlTestContext.Builder() + .setMetadataVersion(metadataVersion) + .build(); + ReplicationControlManager replication = ctx.replicationControl; + ctx.registerBrokers(0, 1, 2, 3); + ctx.unfenceBrokers(0, 1, 2, 3); + ctx.createTestTopic("foo", new int[][] {new int[] {1, 2, 3}}).topicId(); + + ControllerResult<AlterPartitionReassignmentsResponseData> alterResult = + replication.alterPartitionReassignments( + new AlterPartitionReassignmentsRequestData().setTopics(singletonList( + new ReassignableTopic().setName("foo").setPartitions(singletonList( + new ReassignablePartition().setPartitionIndex(0). + setReplicas(asList(0, 1, 2))))))); + assertEquals(new AlterPartitionReassignmentsResponseData(). + setErrorMessage(null).setResponses(singletonList( + new ReassignableTopicResponse().setName("foo").setPartitions(singletonList( + new ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))), + alterResult.response()); + ctx.replay(alterResult.records()); + ListPartitionReassignmentsResponseData currentReassigning = + new ListPartitionReassignmentsResponseData().setErrorMessage(null). + setTopics(singletonList(new OngoingTopicReassignment(). + setName("foo").setPartitions(singletonList( + new OngoingPartitionReassignment().setPartitionIndex(0). + setRemovingReplicas(singletonList(3)). Review Comment: Nit: Same comment as in the previous test ########## tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java: ########## @@ -432,6 +433,21 @@ topicPartition, new PartitionReassignmentState(asList(0, 1, 2), asList(0, 1, 2), } } + @ClusterTest + public void testDisallowReplicationFactorChange() { Review Comment: Could you also add a test (or build on this one) which showcases that **increasing** the replication factor also fails? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org