yashmayya commented on code in PR #15930:
URL: https://github.com/apache/pinot/pull/15930#discussion_r2123362083
##########
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java:
##########
@@ -764,6 +768,195 @@ public void testRebalanceStrictReplicaGroup()
}
}
+ @Test
+ public void
testRebalanceWithImplicitRealtimeTablePartitionSelectorAndMinimizeDataMovement()
+ throws Exception {
+ int numServers = 6;
+ int numPartitions = 18;
+ int numReplicas = 2;
+
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ }
+
+ InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1,
true, null);
+ InstanceAssignmentConfig instanceAssignmentConfig =
+ new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0,
null), null,
+ replicaGroupPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
true);
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .setStreamConfigs(
+
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap())
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+
+ // Create the table
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegmentsPerPartition = 4;
+ for (int i = 0; i < numPartitions; i++) {
+ for (int j = 0; j < numSegmentsPerPartition; j++) {
+ _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME,
+ SEGMENT_NAME_PREFIX + (i * numSegmentsPerPartition + j),
PARTITION_COLUMN, i), null);
+ }
+ }
+
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(REALTIME_TABLE_NAME).getRecord().getMapFields();
+ for (Map.Entry<String, Map<String, String>> entry :
oldSegmentAssignment.entrySet()) {
+ assertEquals(entry.getValue().size(), numReplicas);
+ }
+
+ // Verify that segments are distributed equally across servers
+ Map<String, Integer> numSegmentsPerServer =
getNumSegmentsPerServer(oldSegmentAssignment);
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+ assertTrue(numSegmentsPerServer.containsKey(instanceId));
+ // Total number of segments is numReplicas * numPartitions *
(numSegmentsPerPartition + 1) because of
+ // CONSUMING segments
+ assertEquals(numSegmentsPerServer.get(instanceId),
+ (numReplicas * numPartitions * (numSegmentsPerPartition + 1)) /
numServers);
+ }
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, null, null);
+ // Rebalance should return NO_OP status since there has been no change
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
+ // All servers should be assigned to the table
+ Map<InstancePartitionsType, InstancePartitions> instanceAssignment =
rebalanceResult.getInstanceAssignment();
+ assertEquals(instanceAssignment.size(), 1);
+ InstancePartitions instancePartitions =
instanceAssignment.get(InstancePartitionsType.CONSUMING);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Verify that replica partitions are distributed equally across servers
+ Map<String, Integer> numReplicaPartitionsPerServer =
getNumReplicaPartitionsPerServer(instancePartitions);
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+ assertTrue(numReplicaPartitionsPerServer.containsKey(instanceId));
+ // Total number of partitions is numReplicas * numPartitions
+ assertEquals(numReplicaPartitionsPerServer.get(instanceId), (numReplicas
* numPartitions) / numServers);
+ }
+
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // Add two new servers
+ int numServersToAdd = 2;
+ Set<String> newServers = new HashSet<>();
+ for (int i = 0; i < numServersToAdd; i++) {
+ String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ newServers.add(instanceId);
+ }
+
+ // Rebalance with reassignInstances and minimizeDataMovement enabled
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setIncludeConsuming(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ instanceAssignment = rebalanceResult.getInstanceAssignment();
+ assertEquals(instanceAssignment.size(), 1);
+ instancePartitions =
instanceAssignment.get(InstancePartitionsType.CONSUMING);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Get number of segments moved
+ int numSegmentsMoved = getNumSegmentsMoved(oldSegmentAssignment,
rebalanceResult.getSegmentAssignment());
+ // This number is 130 when using the default partition selector in this
same scenario since more segment partitions
+ // will be moved when the instance partitions don't match the segment
partitions (we're setting numPartitions to
+ // the default value of 0 in the table's instance assignment config).
+ assertEquals(numSegmentsMoved, 30);
Review Comment:
This is the crux of the new test and validates that this new instance
assignment strategy does indeed minimize data movement in scenarios like
https://github.com/apache/pinot/issues/14151.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]