clolov commented on code in PR #18983:
URL: https://github.com/apache/kafka/pull/18983#discussion_r1965316204


##########
clients/src/main/java/org/apache/kafka/clients/admin/AlterPartitionReassignmentsOptions.java:
##########
@@ -28,4 +28,25 @@
  */
 @InterfaceStability.Evolving
 public class AlterPartitionReassignmentsOptions extends 
AbstractOptions<AlterPartitionReassignmentsOptions> {
+
+    private Boolean allowReplicationFactorChange = true;
+
+    /**
+     * Set the option indicating if the alter partition reassignments call 
should be
+     * allowed to alter the replication factor of a partition.
+     * In cases where it is not allowed, any replication factor change will 
result in an exception thrown by the API.
+     */
+    public boolean allowReplicationFactorChange(boolean allow) {
+        this.allowReplicationFactorChange = allow;
+        return allowReplicationFactorChange;

Review Comment:
   On a quick cross-check with other *Options it appears that for such methods 
we return the *Options class itself. In other words, something along these lines
   ```
   public AlterPartitionReassignmentsOptions 
allowReplicationFactorChange(boolean allow) {
       this.allowReplicationFactorChange = allow;
       return this;
   }
   ```



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) {
         assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null, Long.MAX_VALUE));
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionDisallowReplicationFactorChange(short 
version) {
+        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
+                .setMetadataVersion(metadataVersion)
+                .build();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
+        ctx.createTestTopic("foo", new int[][] {new int[] {1, 2, 3}, new int[] 
{1, 2, 3}, new int[] {1, 2, 3}});
+
+        ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                new 
ReassignableTopic().setName("foo").setPartitions(asList(
+                                        new 
ReassignablePartition().setPartitionIndex(0).
+                                                setReplicas(asList(0, 1, 2)),
+                                        new 
ReassignablePartition().setPartitionIndex(1).
+                                                setReplicas(asList(0, 1)),
+                                        new 
ReassignablePartition().setPartitionIndex(2).
+                                                setReplicas(asList(0, 1, 2, 
3)))))).
+                                setAllowReplicationFactorChange(false));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(asList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                                                setErrorMessage(null),
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(1).
+                                                
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                                                setErrorMessage("The 
replication factor is changed from 3 to 2"),
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(2).
+                                                
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                                                setErrorMessage("The 
replication factor is changed from 3 to 4"))))),
+                alterResult.response());
+        ctx.replay(alterResult.records());
+        ListPartitionReassignmentsResponseData currentReassigning =
+                new 
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+                        setTopics(singletonList(new OngoingTopicReassignment().
+                                setName("foo").setPartitions(singletonList(
+                                        new 
OngoingPartitionReassignment().setPartitionIndex(0).
+                                                
setRemovingReplicas(singletonList(3)).

Review Comment:
   Nit: Partition 0 started with replicas (0, 1, 2). For the sake of 
continuity, could you remove 0 and add 3 rather than the other way around?



##########
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java:
##########
@@ -2406,6 +2414,31 @@ private void updatePartitionInfo(
             newPartInfo.elr);
     }
 
+    private void validatePartitionReplicationFactorUnchanged(
+            PartitionRegistration part,

Review Comment:
   Nit: Could you align the first argument with the opening bracket as other 
methods in this file?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) {
         assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null, Long.MAX_VALUE));
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionDisallowReplicationFactorChange(short 
version) {

Review Comment:
   Maybe there already are such tests, in which case please point them out to 
me, but if not, could you add the same tests but with 
`setAllowReplicationFactorChange(true)`?



##########
metadata/src/test/java/org/apache/kafka/controller/ReplicationControlManagerTest.java:
##########
@@ -1922,6 +1922,135 @@ public void testReassignPartitions(short version) {
         assertEquals(NONE_REASSIGNING, 
replication.listPartitionReassignments(null, Long.MAX_VALUE));
     }
 
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void testAlterPartitionDisallowReplicationFactorChange(short 
version) {
+        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
+                .setMetadataVersion(metadataVersion)
+                .build();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
+        ctx.createTestTopic("foo", new int[][] {new int[] {1, 2, 3}, new int[] 
{1, 2, 3}, new int[] {1, 2, 3}});
+
+        ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                new 
ReassignableTopic().setName("foo").setPartitions(asList(
+                                        new 
ReassignablePartition().setPartitionIndex(0).
+                                                setReplicas(asList(0, 1, 2)),
+                                        new 
ReassignablePartition().setPartitionIndex(1).
+                                                setReplicas(asList(0, 1)),
+                                        new 
ReassignablePartition().setPartitionIndex(2).
+                                                setReplicas(asList(0, 1, 2, 
3)))))).
+                                setAllowReplicationFactorChange(false));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(asList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                                                setErrorMessage(null),
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(1).
+                                                
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                                                setErrorMessage("The 
replication factor is changed from 3 to 2"),
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(2).
+                                                
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                                                setErrorMessage("The 
replication factor is changed from 3 to 4"))))),
+                alterResult.response());
+        ctx.replay(alterResult.records());
+        ListPartitionReassignmentsResponseData currentReassigning =
+                new 
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+                        setTopics(singletonList(new OngoingTopicReassignment().
+                                setName("foo").setPartitions(singletonList(
+                                        new 
OngoingPartitionReassignment().setPartitionIndex(0).
+                                                
setRemovingReplicas(singletonList(3)).
+                                                
setAddingReplicas(singletonList(0)).
+                                                setReplicas(asList(0, 1, 2, 
3))))));
+        assertEquals(currentReassigning, 
replication.listPartitionReassignments(singletonList(
+                new ListPartitionReassignmentsTopics().setName("foo").
+                        setPartitionIndexes(asList(0, 1, 2))), 
Long.MAX_VALUE));
+
+        // test alter replica factor not allow to change when partition 
reassignment is ongoing
+        ControllerResult<AlterPartitionReassignmentsResponseData> 
alterReassigningResult =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                new 
ReassignableTopic().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 1)))))).
+                                setAllowReplicationFactorChange(false));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                                                
setErrorCode(INVALID_REPLICATION_FACTOR.code()).
+                                                setErrorMessage("The 
replication factor is changed from 3 to 2"))))),
+                alterReassigningResult.response());
+
+        ControllerResult<AlterPartitionReassignmentsResponseData> 
alterReassigningResult2 =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                        new 
ReassignableTopic().setName("foo").setPartitions(singletonList(
+                                                new 
ReassignablePartition().setPartitionIndex(0).setReplicas(asList(0, 2, 3)))))).
+                                setAllowReplicationFactorChange(false));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        
setErrorMessage(null).setAllowReplicationFactorChange(false).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).
+                                                setErrorMessage(null))))),
+                alterReassigningResult2.response());
+    }
+
+    @ParameterizedTest
+    @ApiKeyVersionsSource(apiKey = ApiKeys.ALTER_PARTITION)
+    public void 
testDisallowReplicationFactorChangeNoEffectWhenCancelAlterPartition(short 
version) {
+        MetadataVersion metadataVersion = MetadataVersion.latestTesting();
+        ReplicationControlTestContext ctx = new 
ReplicationControlTestContext.Builder()
+                .setMetadataVersion(metadataVersion)
+                .build();
+        ReplicationControlManager replication = ctx.replicationControl;
+        ctx.registerBrokers(0, 1, 2, 3);
+        ctx.unfenceBrokers(0, 1, 2, 3);
+        ctx.createTestTopic("foo", new int[][] {new int[] {1, 2, 
3}}).topicId();
+
+        ControllerResult<AlterPartitionReassignmentsResponseData> alterResult =
+                replication.alterPartitionReassignments(
+                        new 
AlterPartitionReassignmentsRequestData().setTopics(singletonList(
+                                        new 
ReassignableTopic().setName("foo").setPartitions(singletonList(
+                                                new 
ReassignablePartition().setPartitionIndex(0).
+                                                        setReplicas(asList(0, 
1, 2)))))));
+        assertEquals(new AlterPartitionReassignmentsResponseData().
+                        setErrorMessage(null).setResponses(singletonList(
+                                new 
ReassignableTopicResponse().setName("foo").setPartitions(singletonList(
+                                        new 
ReassignablePartitionResponse().setPartitionIndex(0).setErrorMessage(null))))),
+                alterResult.response());
+        ctx.replay(alterResult.records());
+        ListPartitionReassignmentsResponseData currentReassigning =
+                new 
ListPartitionReassignmentsResponseData().setErrorMessage(null).
+                        setTopics(singletonList(new OngoingTopicReassignment().
+                                setName("foo").setPartitions(singletonList(
+                                        new 
OngoingPartitionReassignment().setPartitionIndex(0).
+                                                
setRemovingReplicas(singletonList(3)).

Review Comment:
   Nit: Same comment as in the previous test



##########
tools/src/test/java/org/apache/kafka/tools/reassign/ReassignPartitionsCommandTest.java:
##########
@@ -432,6 +433,21 @@ topicPartition, new PartitionReassignmentState(asList(0, 
1, 2), asList(0, 1, 2),
         }
     }
 
+    @ClusterTest
+    public void testDisallowReplicationFactorChange() {

Review Comment:
   Could you also add a test (or build on this one) which showcases that 
**increasing** the replication factor also fails?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to