vvivekiyer commented on code in PR #11578:
URL: https://github.com/apache/pinot/pull/11578#discussion_r1372113740
##########
pinot-common/src/main/java/org/apache/pinot/common/assignment/InstanceAssignmentConfigUtils.java:
##########
@@ -124,4 +124,14 @@ public static InstanceAssignmentConfig
getInstanceAssignmentConfig(TableConfig t
return new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig);
}
+
+ public static boolean isPreConfigurationBasedAssignment(TableConfig
tableConfig,
+ InstancePartitionsType instancePartitionsType) {
+ // If the instance assignment config is not null and the partition
selector is
+ // PRE_CONFIGURATION_BASED_PARTITION_SELECTOR,
Review Comment:
Update comment and function name.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java:
##########
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+ "Number of instances per replica must be positive");
+ LOGGER.info("Number of instances per replica: {}",
_numTargetInstancesPerReplicaGroup);
+ // _numTargetReplicaGroups should be positive
+ Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of
replica-groups must be positive");
+ LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+ // validate target partition count is 1
+ Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions()
<= 1,
+ "This algorithm does not support table level partitioning for target
assignment");
+ LOGGER.info("Number of partitions: {}",
_replicaGroupPartitionConfig.getNumPartitions());
+
+ // Validate the existing instance partitions is null or has only one
partition
+ Preconditions.checkState(
+ (_existingInstancePartitions == null ||
_existingInstancePartitions.getNumPartitions() == 1),
+ "This algorithm does not support table level partitioning for existing
assignment");
+ LOGGER.info("Number of partitions in existing instance partitions: {}",
_existingInstancePartitions == null ? 0
+ : _existingInstancePartitions.getNumPartitions());
+
+ _numExistingReplicaGroups =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumReplicaGroups();
+ _numExistingInstancesPerReplicaGroup =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getInstances(0, 0).size();
+
+ // Validate the pre-configured instance partitions is not null and has
only one partition
+ Preconditions.checkState(_preConfiguredInstancePartitions != null,
+ "Pre-configured instance partitions must be provided for
pre-configuration based selection");
+
Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() ==
1,
+ "This algorithm does not support table level partitioning for
pre-configured assignment");
+ LOGGER.info("Number of partitions in pre-configured instance partitions:
{}", _preConfiguredInstancePartitions
+ .getNumPartitions());
+
+ // Validate the number of replica-groups in the pre-configured instance
partitions is equal to the target
+ // number of replica-groups
+ _numPreConfiguredReplicaGroups =
_preConfiguredInstancePartitions.getNumReplicaGroups();
+ Preconditions.checkState(_numPreConfiguredReplicaGroups ==
_numTargetReplicaGroups,
+ "The number of replica-groups %s in the pre-configured instance
partitions "
+ + "is not equal to the target number of replica-groups %s",
_numPreConfiguredReplicaGroups,
+ _numTargetReplicaGroups);
+ LOGGER.info("Number of replica-groups in pre-configured instance
partitions: {}", _numPreConfiguredReplicaGroups);
+
+ // Validate the number of instances per replica-group in the
pre-configured instance partitions is greater than or
+ // equal to the target number of instances per replica-group
+ _numPreConfiguredInstancesPerReplicaGroup =
_preConfiguredInstancePartitions.getInstances(0, 0).size();
+ Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >=
_numTargetInstancesPerReplicaGroup,
+ "The number of instances per replica-group in the pre-configured "
Review Comment:
nit: identation
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java:
##########
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+ "Number of instances per replica must be positive");
+ LOGGER.info("Number of instances per replica: {}",
_numTargetInstancesPerReplicaGroup);
+ // _numTargetReplicaGroups should be positive
+ Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of
replica-groups must be positive");
+ LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+ // validate target partition count is 1
+ Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions()
<= 1,
+ "This algorithm does not support table level partitioning for target
assignment");
+ LOGGER.info("Number of partitions: {}",
_replicaGroupPartitionConfig.getNumPartitions());
+
+ // Validate the existing instance partitions is null or has only one
partition
+ Preconditions.checkState(
+ (_existingInstancePartitions == null ||
_existingInstancePartitions.getNumPartitions() == 1),
+ "This algorithm does not support table level partitioning for existing
assignment");
+ LOGGER.info("Number of partitions in existing instance partitions: {}",
_existingInstancePartitions == null ? 0
+ : _existingInstancePartitions.getNumPartitions());
+
+ _numExistingReplicaGroups =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumReplicaGroups();
+ _numExistingInstancesPerReplicaGroup =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getInstances(0, 0).size();
+
+ // Validate the pre-configured instance partitions is not null and has
only one partition
+ Preconditions.checkState(_preConfiguredInstancePartitions != null,
+ "Pre-configured instance partitions must be provided for
pre-configuration based selection");
+
Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() ==
1,
+ "This algorithm does not support table level partitioning for
pre-configured assignment");
+ LOGGER.info("Number of partitions in pre-configured instance partitions:
{}", _preConfiguredInstancePartitions
+ .getNumPartitions());
+
+ // Validate the number of replica-groups in the pre-configured instance
partitions is equal to the target
+ // number of replica-groups
+ _numPreConfiguredReplicaGroups =
_preConfiguredInstancePartitions.getNumReplicaGroups();
+ Preconditions.checkState(_numPreConfiguredReplicaGroups ==
_numTargetReplicaGroups,
+ "The number of replica-groups %s in the pre-configured instance
partitions "
+ + "is not equal to the target number of replica-groups %s",
_numPreConfiguredReplicaGroups,
+ _numTargetReplicaGroups);
+ LOGGER.info("Number of replica-groups in pre-configured instance
partitions: {}", _numPreConfiguredReplicaGroups);
+
+ // Validate the number of instances per replica-group in the
pre-configured instance partitions is greater than or
+ // equal to the target number of instances per replica-group
+ _numPreConfiguredInstancesPerReplicaGroup =
_preConfiguredInstancePartitions.getInstances(0, 0).size();
+ Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >=
_numTargetInstancesPerReplicaGroup,
+ "The number of instances per replica-group in the pre-configured "
+ + "instance partitions is less than the target number of instances
per replica-group");
+ LOGGER.info("Number of instances per replica-group in pre-configured
instance partitions: {}",
+ _numPreConfiguredInstancesPerReplicaGroup);
+
+ // Validate the pool to instance configs map is not null or empty
+ Preconditions.checkNotNull(poolToInstanceConfigsMap,
"poolToInstanceConfigsMap is null");
+ int numPools = poolToInstanceConfigsMap.size();
+ Preconditions.checkState(numPools > 0, "No pool qualified for selection");
+
Preconditions.checkState(poolToInstanceConfigsMap.values().stream().map(List::size).reduce(Integer::sum)
+ .orElse(0) >= _numTargetTotalInstances,
+ "The total number of instances in all pools is less than the target
number of target instances");
+
+ HashSet<String> availableInstanceSet = new HashSet<>();
+ poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i ->
availableInstanceSet.add(i.getInstanceName())));
+ LOGGER.info("Number of pools: {}", numPools);
+ LOGGER.info("Number of instances in all pools: {}",
availableInstanceSet.size());
+ LOGGER.info("availableInstanceSet: {}", availableInstanceSet);
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ List<String> instances =
_preConfiguredInstancePartitions.getInstances(0, i);
+ for (String instance : instances) {
+ Preconditions.checkState(availableInstanceSet.contains(instance),
+ "Instance %s in pre-configured instance partitions is not in "
+ + "the pool to instance configs map",
+ instance);
+ }
+ }
+
+ LOGGER.info("Validation passed. The instances provided can satisfy the
pool diverse requirement.");
+ LOGGER.info("Trying to assign total {} instances to {} replica groups, " +
"with {} instance per replica group",
+ _numTargetTotalInstances, _numTargetReplicaGroups,
_numTargetInstancesPerReplicaGroup);
+ }
+
+ void createListFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ List<String> mirroredServerList = new ArrayList<>();
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j));
+ }
+ _preConfiguredMirroredServerLists.add(mirroredServerList);
+ }
+ }
+
+ void createLookupTablesFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ String instance = preConfiguredReplicaGroups.get(i).get(j);
+ _preConfiguredInstanceNameToOffsetMap.put(instance, j);
+ }
+ }
+ }
+
+ @Override
+ public void selectInstances(Map<Integer, List<InstanceConfig>>
poolToInstanceConfigsMap,
+ InstancePartitions instancePartitions) {
+ if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
+ validatePoolDiversePreconditions(poolToInstanceConfigsMap);
+ if (_existingInstancePartitions == null) {
+ // If no existing instance partitions, create new instance partitions
based on the pre-configured instance
+ // partitions. This is done by just selecting
_targetNumInstancesPerReplicaGroup set of mirrored servers
+ // from the pre-configured instance partitions.
+ LOGGER.info("No existing instance partitions found. Will build new on
top of"
+ + " the pre-configured instance partitions");
+ // create a list of lists of mirrored servers from the pre-configured
instance partitions
+ createListFromPreConfiguredInstanceAssignmentMap();
Review Comment:
```suggestion
createMirrorServerList();
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -286,6 +290,81 @@ public String getTablesOnTenant(
}
}
+ @GET
+ @Path("/tenants/{tenantName}/instancePartitions")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.GET_INSTANCE_PARTITIONS)
+ @Authenticate(AccessType.READ)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Get the instance partitions of a tenant")
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "Success",
response = InstancePartitions.class),
+ @ApiResponse(code = 404, message = "Instance partitions not found")})
+ public InstancePartitions getInstancePartitions(
+ @ApiParam(value = "Tenant name ", required = true)
@PathParam("tenantName") String tenantName,
+ @ApiParam(value = "instancePartitionType (OFFLINE|CONSUMING|COMPLETED)",
required = true,
+ allowableValues = "OFFLINE, CONSUMING, COMPLETED")
+ @QueryParam("instancePartitionType") String instancePartitionType) {
+ String tenantNameWithType =
InstancePartitionsType.valueOf(instancePartitionType)
+ .getInstancePartitionsName(tenantName);
+ InstancePartitions instancePartitions =
+
InstancePartitionsUtils.fetchInstancePartitions(_pinotHelixResourceManager.getPropertyStore(),
+ tenantNameWithType);
+
+ if (instancePartitions == null) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to find the instance partitions for %s",
tenantNameWithType),
+ Response.Status.NOT_FOUND);
+ } else {
+ return instancePartitions;
+ }
+ }
+
+ @PUT
+ @Path("/tenants/{tenantName}/instancePartitions")
+ @Authorize(targetType = TargetType.CLUSTER, action =
Actions.Cluster.UPDATE_INSTANCE_PARTITIONS)
+ @Authenticate(AccessType.UPDATE)
+ @Consumes(MediaType.APPLICATION_JSON)
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Update an instance partition for a server type in a
tenant")
+ @ApiResponses(value = {@ApiResponse(code = 200, message = "Success",
response = InstancePartitions.class),
+ @ApiResponse(code = 400, message = "Failed to update the tenant")})
Review Comment:
We seem to be throwing internal server error (500) as well from
`persistInstancePartitionsHelper`. We need two error codes and messages.
400 Bad request
500 internal server error.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java:
##########
@@ -286,6 +290,81 @@ public String getTablesOnTenant(
}
}
+ @GET
+ @Path("/tenants/{tenantName}/instancePartitions")
Review Comment:
If we have a tenant with instancePartitions defined in PropertyStore, do you
think it is a requirement to enforce that all tables in the tenant follow this
new partitioning scheme? If yes, how can we enforce it?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java:
##########
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
Review Comment:
I meant to add the _numTargetInstancesPerReplicaGroup to the
Precondition.checkState() error message. But this is fine too. If you decide to
keep the info log, please move it above the Precondition to help with debugging.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java:
##########
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+ "Number of instances per replica must be positive");
+ LOGGER.info("Number of instances per replica: {}",
_numTargetInstancesPerReplicaGroup);
+ // _numTargetReplicaGroups should be positive
+ Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of
replica-groups must be positive");
+ LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+ // validate target partition count is 1
+ Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions()
<= 1,
+ "This algorithm does not support table level partitioning for target
assignment");
+ LOGGER.info("Number of partitions: {}",
_replicaGroupPartitionConfig.getNumPartitions());
+
+ // Validate the existing instance partitions is null or has only one
partition
+ Preconditions.checkState(
+ (_existingInstancePartitions == null ||
_existingInstancePartitions.getNumPartitions() == 1),
+ "This algorithm does not support table level partitioning for existing
assignment");
+ LOGGER.info("Number of partitions in existing instance partitions: {}",
_existingInstancePartitions == null ? 0
+ : _existingInstancePartitions.getNumPartitions());
+
+ _numExistingReplicaGroups =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumReplicaGroups();
+ _numExistingInstancesPerReplicaGroup =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getInstances(0, 0).size();
+
+ // Validate the pre-configured instance partitions is not null and has
only one partition
+ Preconditions.checkState(_preConfiguredInstancePartitions != null,
Review Comment:
Should we also add a TableConfig level validation where we enforce that
`instancePartitionMap` is always set if MIRROR_SERVER_SET_PARTITION_SELECTOR is
used? It would be good to notify users of errors right when they make an error.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java:
##########
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+ "Number of instances per replica must be positive");
+ LOGGER.info("Number of instances per replica: {}",
_numTargetInstancesPerReplicaGroup);
+ // _numTargetReplicaGroups should be positive
+ Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of
replica-groups must be positive");
+ LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+ // validate target partition count is 1
+ Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions()
<= 1,
+ "This algorithm does not support table level partitioning for target
assignment");
+ LOGGER.info("Number of partitions: {}",
_replicaGroupPartitionConfig.getNumPartitions());
+
+ // Validate the existing instance partitions is null or has only one
partition
+ Preconditions.checkState(
+ (_existingInstancePartitions == null ||
_existingInstancePartitions.getNumPartitions() == 1),
+ "This algorithm does not support table level partitioning for existing
assignment");
+ LOGGER.info("Number of partitions in existing instance partitions: {}",
_existingInstancePartitions == null ? 0
+ : _existingInstancePartitions.getNumPartitions());
+
+ _numExistingReplicaGroups =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumReplicaGroups();
+ _numExistingInstancesPerReplicaGroup =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getInstances(0, 0).size();
+
+ // Validate the pre-configured instance partitions is not null and has
only one partition
+ Preconditions.checkState(_preConfiguredInstancePartitions != null,
+ "Pre-configured instance partitions must be provided for
pre-configuration based selection");
+
Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() ==
1,
+ "This algorithm does not support table level partitioning for
pre-configured assignment");
+ LOGGER.info("Number of partitions in pre-configured instance partitions:
{}", _preConfiguredInstancePartitions
+ .getNumPartitions());
+
+ // Validate the number of replica-groups in the pre-configured instance
partitions is equal to the target
+ // number of replica-groups
+ _numPreConfiguredReplicaGroups =
_preConfiguredInstancePartitions.getNumReplicaGroups();
+ Preconditions.checkState(_numPreConfiguredReplicaGroups ==
_numTargetReplicaGroups,
+ "The number of replica-groups %s in the pre-configured instance
partitions "
+ + "is not equal to the target number of replica-groups %s",
_numPreConfiguredReplicaGroups,
+ _numTargetReplicaGroups);
+ LOGGER.info("Number of replica-groups in pre-configured instance
partitions: {}", _numPreConfiguredReplicaGroups);
+
+ // Validate the number of instances per replica-group in the
pre-configured instance partitions is greater than or
+ // equal to the target number of instances per replica-group
+ _numPreConfiguredInstancesPerReplicaGroup =
_preConfiguredInstancePartitions.getInstances(0, 0).size();
+ Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >=
_numTargetInstancesPerReplicaGroup,
+ "The number of instances per replica-group in the pre-configured "
+ + "instance partitions is less than the target number of instances
per replica-group");
+ LOGGER.info("Number of instances per replica-group in pre-configured
instance partitions: {}",
+ _numPreConfiguredInstancesPerReplicaGroup);
+
+ // Validate the pool to instance configs map is not null or empty
+ Preconditions.checkNotNull(poolToInstanceConfigsMap,
"poolToInstanceConfigsMap is null");
+ int numPools = poolToInstanceConfigsMap.size();
+ Preconditions.checkState(numPools > 0, "No pool qualified for selection");
+
Preconditions.checkState(poolToInstanceConfigsMap.values().stream().map(List::size).reduce(Integer::sum)
+ .orElse(0) >= _numTargetTotalInstances,
+ "The total number of instances in all pools is less than the target
number of target instances");
+
+ HashSet<String> availableInstanceSet = new HashSet<>();
+ poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i ->
availableInstanceSet.add(i.getInstanceName())));
+ LOGGER.info("Number of pools: {}", numPools);
+ LOGGER.info("Number of instances in all pools: {}",
availableInstanceSet.size());
+ LOGGER.info("availableInstanceSet: {}", availableInstanceSet);
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ List<String> instances =
_preConfiguredInstancePartitions.getInstances(0, i);
+ for (String instance : instances) {
+ Preconditions.checkState(availableInstanceSet.contains(instance),
+ "Instance %s in pre-configured instance partitions is not in "
+ + "the pool to instance configs map",
+ instance);
+ }
+ }
+
+ LOGGER.info("Validation passed. The instances provided can satisfy the
pool diverse requirement.");
+ LOGGER.info("Trying to assign total {} instances to {} replica groups, " +
"with {} instance per replica group",
+ _numTargetTotalInstances, _numTargetReplicaGroups,
_numTargetInstancesPerReplicaGroup);
+ }
+
+ void createListFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ List<String> mirroredServerList = new ArrayList<>();
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j));
+ }
+ _preConfiguredMirroredServerLists.add(mirroredServerList);
+ }
+ }
+
+ void createLookupTablesFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ String instance = preConfiguredReplicaGroups.get(i).get(j);
+ _preConfiguredInstanceNameToOffsetMap.put(instance, j);
+ }
+ }
+ }
+
+ @Override
+ public void selectInstances(Map<Integer, List<InstanceConfig>>
poolToInstanceConfigsMap,
+ InstancePartitions instancePartitions) {
+ if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
Review Comment:
Can we reorder the code as follows? Will make it easier to read with less
branching.
```
if (!_replicaGroupPartitionConfig.isReplicaGroupBased()) {
throw Exception(...)
}
....
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java:
##########
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+ "Number of instances per replica must be positive");
+ LOGGER.info("Number of instances per replica: {}",
_numTargetInstancesPerReplicaGroup);
+ // _numTargetReplicaGroups should be positive
+ Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of
replica-groups must be positive");
+ LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+ // validate target partition count is 1
+ Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions()
<= 1,
+ "This algorithm does not support table level partitioning for target
assignment");
+ LOGGER.info("Number of partitions: {}",
_replicaGroupPartitionConfig.getNumPartitions());
+
+ // Validate the existing instance partitions is null or has only one
partition
+ Preconditions.checkState(
+ (_existingInstancePartitions == null ||
_existingInstancePartitions.getNumPartitions() == 1),
+ "This algorithm does not support table level partitioning for existing
assignment");
+ LOGGER.info("Number of partitions in existing instance partitions: {}",
_existingInstancePartitions == null ? 0
+ : _existingInstancePartitions.getNumPartitions());
+
+ _numExistingReplicaGroups =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumReplicaGroups();
+ _numExistingInstancesPerReplicaGroup =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getInstances(0, 0).size();
+
+ // Validate the pre-configured instance partitions is not null and has
only one partition
+ Preconditions.checkState(_preConfiguredInstancePartitions != null,
+ "Pre-configured instance partitions must be provided for
pre-configuration based selection");
+
Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() ==
1,
+ "This algorithm does not support table level partitioning for
pre-configured assignment");
+ LOGGER.info("Number of partitions in pre-configured instance partitions:
{}", _preConfiguredInstancePartitions
+ .getNumPartitions());
+
+ // Validate the number of replica-groups in the pre-configured instance
partitions is equal to the target
+ // number of replica-groups
+ _numPreConfiguredReplicaGroups =
_preConfiguredInstancePartitions.getNumReplicaGroups();
+ Preconditions.checkState(_numPreConfiguredReplicaGroups ==
_numTargetReplicaGroups,
+ "The number of replica-groups %s in the pre-configured instance
partitions "
+ + "is not equal to the target number of replica-groups %s",
_numPreConfiguredReplicaGroups,
+ _numTargetReplicaGroups);
+ LOGGER.info("Number of replica-groups in pre-configured instance
partitions: {}", _numPreConfiguredReplicaGroups);
+
+ // Validate the number of instances per replica-group in the
pre-configured instance partitions is greater than or
+ // equal to the target number of instances per replica-group
+ _numPreConfiguredInstancesPerReplicaGroup =
_preConfiguredInstancePartitions.getInstances(0, 0).size();
+ Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >=
_numTargetInstancesPerReplicaGroup,
+ "The number of instances per replica-group in the pre-configured "
+ + "instance partitions is less than the target number of instances
per replica-group");
+ LOGGER.info("Number of instances per replica-group in pre-configured
instance partitions: {}",
+ _numPreConfiguredInstancesPerReplicaGroup);
+
+ // Validate the pool to instance configs map is not null or empty
+ Preconditions.checkNotNull(poolToInstanceConfigsMap,
"poolToInstanceConfigsMap is null");
+ int numPools = poolToInstanceConfigsMap.size();
+ Preconditions.checkState(numPools > 0, "No pool qualified for selection");
+
Preconditions.checkState(poolToInstanceConfigsMap.values().stream().map(List::size).reduce(Integer::sum)
+ .orElse(0) >= _numTargetTotalInstances,
+ "The total number of instances in all pools is less than the target
number of target instances");
+
+ HashSet<String> availableInstanceSet = new HashSet<>();
+ poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i ->
availableInstanceSet.add(i.getInstanceName())));
+ LOGGER.info("Number of pools: {}", numPools);
+ LOGGER.info("Number of instances in all pools: {}",
availableInstanceSet.size());
+ LOGGER.info("availableInstanceSet: {}", availableInstanceSet);
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ List<String> instances =
_preConfiguredInstancePartitions.getInstances(0, i);
+ for (String instance : instances) {
+ Preconditions.checkState(availableInstanceSet.contains(instance),
+ "Instance %s in pre-configured instance partitions is not in "
+ + "the pool to instance configs map",
+ instance);
+ }
+ }
+
+ LOGGER.info("Validation passed. The instances provided can satisfy the
pool diverse requirement.");
+ LOGGER.info("Trying to assign total {} instances to {} replica groups, " +
"with {} instance per replica group",
+ _numTargetTotalInstances, _numTargetReplicaGroups,
_numTargetInstancesPerReplicaGroup);
+ }
+
+ void createListFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ List<String> mirroredServerList = new ArrayList<>();
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j));
+ }
+ _preConfiguredMirroredServerLists.add(mirroredServerList);
+ }
+ }
+
+ void createLookupTablesFromPreConfiguredInstanceAssignmentMap() {
Review Comment:
Add public/private access modifiers for all missing functions.
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java:
##########
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+ "Number of instances per replica must be positive");
+ LOGGER.info("Number of instances per replica: {}",
_numTargetInstancesPerReplicaGroup);
+ // _numTargetReplicaGroups should be positive
+ Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of
replica-groups must be positive");
+ LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+ // validate target partition count is 1
+ Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions()
<= 1,
+ "This algorithm does not support table level partitioning for target
assignment");
+ LOGGER.info("Number of partitions: {}",
_replicaGroupPartitionConfig.getNumPartitions());
+
+ // Validate the existing instance partitions is null or has only one
partition
+ Preconditions.checkState(
+ (_existingInstancePartitions == null ||
_existingInstancePartitions.getNumPartitions() == 1),
+ "This algorithm does not support table level partitioning for existing
assignment");
+ LOGGER.info("Number of partitions in existing instance partitions: {}",
_existingInstancePartitions == null ? 0
+ : _existingInstancePartitions.getNumPartitions());
+
+ _numExistingReplicaGroups =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumReplicaGroups();
+ _numExistingInstancesPerReplicaGroup =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getInstances(0, 0).size();
+
+ // Validate the pre-configured instance partitions is not null and has
only one partition
+ Preconditions.checkState(_preConfiguredInstancePartitions != null,
+ "Pre-configured instance partitions must be provided for
pre-configuration based selection");
+
Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() ==
1,
+ "This algorithm does not support table level partitioning for
pre-configured assignment");
+ LOGGER.info("Number of partitions in pre-configured instance partitions:
{}", _preConfiguredInstancePartitions
+ .getNumPartitions());
+
+ // Validate the number of replica-groups in the pre-configured instance
partitions is equal to the target
+ // number of replica-groups
+ _numPreConfiguredReplicaGroups =
_preConfiguredInstancePartitions.getNumReplicaGroups();
+ Preconditions.checkState(_numPreConfiguredReplicaGroups ==
_numTargetReplicaGroups,
+ "The number of replica-groups %s in the pre-configured instance
partitions "
+ + "is not equal to the target number of replica-groups %s",
_numPreConfiguredReplicaGroups,
+ _numTargetReplicaGroups);
+ LOGGER.info("Number of replica-groups in pre-configured instance
partitions: {}", _numPreConfiguredReplicaGroups);
+
+ // Validate the number of instances per replica-group in the
pre-configured instance partitions is greater than or
+ // equal to the target number of instances per replica-group
+ _numPreConfiguredInstancesPerReplicaGroup =
_preConfiguredInstancePartitions.getInstances(0, 0).size();
+ Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >=
_numTargetInstancesPerReplicaGroup,
+ "The number of instances per replica-group in the pre-configured "
+ + "instance partitions is less than the target number of instances
per replica-group");
+ LOGGER.info("Number of instances per replica-group in pre-configured
instance partitions: {}",
+ _numPreConfiguredInstancesPerReplicaGroup);
+
+ // Validate the pool to instance configs map is not null or empty
+ Preconditions.checkNotNull(poolToInstanceConfigsMap,
"poolToInstanceConfigsMap is null");
+ int numPools = poolToInstanceConfigsMap.size();
+ Preconditions.checkState(numPools > 0, "No pool qualified for selection");
+
Preconditions.checkState(poolToInstanceConfigsMap.values().stream().map(List::size).reduce(Integer::sum)
+ .orElse(0) >= _numTargetTotalInstances,
+ "The total number of instances in all pools is less than the target
number of target instances");
+
+ HashSet<String> availableInstanceSet = new HashSet<>();
+ poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i ->
availableInstanceSet.add(i.getInstanceName())));
+ LOGGER.info("Number of pools: {}", numPools);
+ LOGGER.info("Number of instances in all pools: {}",
availableInstanceSet.size());
+ LOGGER.info("availableInstanceSet: {}", availableInstanceSet);
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ List<String> instances =
_preConfiguredInstancePartitions.getInstances(0, i);
+ for (String instance : instances) {
+ Preconditions.checkState(availableInstanceSet.contains(instance),
+ "Instance %s in pre-configured instance partitions is not in "
+ + "the pool to instance configs map",
+ instance);
+ }
+ }
+
+ LOGGER.info("Validation passed. The instances provided can satisfy the
pool diverse requirement.");
+ LOGGER.info("Trying to assign total {} instances to {} replica groups, " +
"with {} instance per replica group",
+ _numTargetTotalInstances, _numTargetReplicaGroups,
_numTargetInstancesPerReplicaGroup);
+ }
+
+ void createListFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ List<String> mirroredServerList = new ArrayList<>();
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j));
+ }
+ _preConfiguredMirroredServerLists.add(mirroredServerList);
+ }
+ }
+
+ void createLookupTablesFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ String instance = preConfiguredReplicaGroups.get(i).get(j);
+ _preConfiguredInstanceNameToOffsetMap.put(instance, j);
+ }
+ }
+ }
+
+ @Override
+ public void selectInstances(Map<Integer, List<InstanceConfig>>
poolToInstanceConfigsMap,
+ InstancePartitions instancePartitions) {
+ if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
+ validatePoolDiversePreconditions(poolToInstanceConfigsMap);
+ if (_existingInstancePartitions == null) {
+ // If no existing instance partitions, create new instance partitions
based on the pre-configured instance
+ // partitions. This is done by just selecting
_targetNumInstancesPerReplicaGroup set of mirrored servers
+ // from the pre-configured instance partitions.
+ LOGGER.info("No existing instance partitions found. Will build new on
top of"
+ + " the pre-configured instance partitions");
+ // create a list of lists of mirrored servers from the pre-configured
instance partitions
+ createListFromPreConfiguredInstanceAssignmentMap();
+ // shuffle the list of lists of mirrored servers based on the table
name hash
+ int tableNameHash = Math.abs(_tableNameWithType.hashCode());
+ // initialize a list of indices from 0 to
_numPreConfiguredInstancesPerReplicaGroup
+ List<Integer> shuffledIndex = new
ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup);
+ for (int i = 0; i < _numPreConfiguredInstancesPerReplicaGroup; i++) {
+ shuffledIndex.add(i);
+ }
+ // shuffle the list of indices based on the table name hash
+ Collections.shuffle(shuffledIndex, new Random(tableNameHash));
+ // select the first _numTargetInstancesPerReplicaGroup indices
+ shuffledIndex = shuffledIndex.subList(0,
_numTargetInstancesPerReplicaGroup);
+ // sort the list of indices so that they follow the original order of
the pre-configured instance partitions
+ shuffledIndex.sort(Comparator.naturalOrder());
+
+ // create the instance partitions based on the shuffled list of
mirrored servers
+ List<List<String>> resultReplicaGroups = new
ArrayList<>(_numTargetReplicaGroups);
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ resultReplicaGroups.add(new
ArrayList<>(_numTargetInstancesPerReplicaGroup));
+ }
+
+ // populate the instance partitions with the selected mirrored servers
+ for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+
resultReplicaGroups.get(i).add(_preConfiguredMirroredServerLists.get(shuffledIndex.get(j)).get(i));
+ }
+ }
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ instancePartitions.setInstances(0, i, resultReplicaGroups.get(i));
+ }
+ } else {
+ // If existing instance partitions exist, adjust the existing instance
partitions based on the pre-configured
+ // instance partitions. This code path takes care of instance
replacement, uplift, and downlift.
+ // This is done by search in the pre-configured instance partitions
for the mirrored
+ // servers sets that are similar to the existing sets in instance
partitions.
+ LOGGER.info("Existing instance partitions found. Will adjust the
existing instance partitions"
+ + " based on the pre-configured instance partitions");
+ createListFromPreConfiguredInstanceAssignmentMap();
+ createLookupTablesFromPreConfiguredInstanceAssignmentMap();
+ createListAndLookupTablesFromExistingInstancePartitions();
+ Set<Integer> usedPreconfiguredInstanceOffsets = new HashSet<>();
+ Map<Integer, Map.Entry<Integer, Long>> existingOffsetToResultTuple =
new HashMap<>();
+
+ // For each instance offset, find the mirrored server that is most
similar to the existing mirrored server
+ // set. If this mirrored server is not used, add it to the result list.
+ for (int j = 0; j < _numExistingInstancesPerReplicaGroup; j++) {
+ List<String> existingMirroredServers =
_existingMirroredServerLists.get(j);
+ int finalJ = j;
+ existingMirroredServers.stream()
+ .map(_preConfiguredInstanceNameToOffsetMap::get)
+ .filter(Objects::nonNull)
+ .filter(offset ->
!usedPreconfiguredInstanceOffsets.contains(offset))
+ .collect(Collectors.groupingBy(Function.identity(),
Collectors.counting()))
+
.entrySet().stream().max(Map.Entry.comparingByValue()).ifPresent(e -> {
+ existingOffsetToResultTuple.put(finalJ, e);
+ usedPreconfiguredInstanceOffsets.add(e.getKey());
+ });
+ }
+
+ if (_numExistingInstancesPerReplicaGroup >
_numTargetInstancesPerReplicaGroup) {
+ // If this is a downlift case
+ List<Map.Entry<Integer, Long>> collect =
existingOffsetToResultTuple.values()
+ .stream()
+ .sorted((a, b) -> b.getValue().compareTo(a.getValue()))
+ .limit(_numTargetInstancesPerReplicaGroup)
+ .collect(Collectors.toList());
+ int size = collect.size();
+ existingOffsetToResultTuple.clear();
+ usedPreconfiguredInstanceOffsets.clear();
+ for (int j = 0; j < size; j++) {
+ existingOffsetToResultTuple.put(j, collect.get(j));
+ usedPreconfiguredInstanceOffsets.add(collect.get(j).getKey());
+ }
+ }
+
+ if (existingOffsetToResultTuple.size() <
_numTargetInstancesPerReplicaGroup) {
+ // If the number of instances selected from the result list is less
than the target number
+ // of instances per replica group, add the remaining instances from
the pre-configured instance partitions.
+ ArrayList<Integer> shuffledOffsets = new
ArrayList<>(_numPreConfiguredInstancesPerReplicaGroup);
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ shuffledOffsets.add(j);
+ }
+ // Commenting this out as
+ // (1) Shuffling is already done in the initial step.
+ // (2) We want to keep the order of the pre-configured instance
partitions, so that the segment assignment
+ // strategy for single tenant cluster can be minimized-impact.
+ // But keeping the code here in case we want to have a specific
reordering strategy in the future.
+ // Collections.shuffle(shuffledOffsets, new
Random(Math.abs(_tableNameWithType.hashCode())));
+ for (int k = 0, j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+ if (existingOffsetToResultTuple.containsKey(j)) {
+ continue;
+ }
+ while
(usedPreconfiguredInstanceOffsets.contains(shuffledOffsets.get(k))) {
+ k++;
+ }
+ Integer offset = shuffledOffsets.get(k);
+ existingOffsetToResultTuple.put(j, new
AbstractMap.SimpleEntry<>(offset, 0L));
+ usedPreconfiguredInstanceOffsets.add(offset);
+ }
+ }
+
+ List<List<String>> resultReplicaGroups = new
ArrayList<>(_numTargetReplicaGroups);
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ resultReplicaGroups.add(new
ArrayList<>(_numTargetInstancesPerReplicaGroup));
+ }
+ for (int j = 0; j < _numTargetInstancesPerReplicaGroup; j++) {
+ List<String> mirrorServers =
+
_preConfiguredMirroredServerLists.get(existingOffsetToResultTuple.get(j).getKey());
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ resultReplicaGroups.get(i).add(mirrorServers.get(i));
+ }
+ }
+ for (int i = 0; i < _numTargetReplicaGroups; i++) {
+ instancePartitions.setInstances(0, i, resultReplicaGroups.get(i));
+ }
+ }
+ } else {
+ throw new IllegalStateException("Does not support Non-replica-group
based selection");
+ }
+ }
+
+ private void createListAndLookupTablesFromExistingInstancePartitions() {
Review Comment:
```suggestion
private void createExistingMirrorServersList() {
```
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/MirrorServerSetInstancePartitionSelector.java:
##########
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.base.Preconditions;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Detailed design see
https://docs.google.com/document/d/1xxPkGPxyY21gAkFi9gtFDeSzEXjPjp-IQW70kHynsL8
+ * During each creation/update/scale, the algorithm will refer to the
corresponding tenant level instance partitions and
+ * generate an instance partition by taking numInstancePerReplicaGroup mirror
server sets from the tenant level
+ * instance partitions.
+ *
+ * If an existingInstancePartition is provided, the algorithm will generate a
best effort assignment that resembles
+ * the existingInstancePartition.
+ *
+ * Assumptions for this algorithm:
+ * 1. The number of replica groups in the tenant level instance partitions is
the same as the number of replica groups
+ * in the table config.
+ * 2. The number of partitions at replica group level is 1
+ * 3. This algorithm only works for replica group based table assignment
+ */
+public class MirrorServerSetInstancePartitionSelector extends
InstancePartitionSelector {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(MirrorServerSetInstancePartitionSelector.class);
+ private final InstancePartitions _preConfiguredInstancePartitions;
+
+ // dimensions of target instance partition
+ private final int _numTargetInstancesPerReplicaGroup;
+ private final int _numTargetReplicaGroups;
+ private final int _numTargetTotalInstances;
+
+ // dimensions of pre-configured instance partition
+ private int _numPreConfiguredReplicaGroups;
+ private int _numPreConfiguredInstancesPerReplicaGroup;
+
+ // dimensions of existing instance partition
+ private int _numExistingReplicaGroups;
+ private int _numExistingInstancesPerReplicaGroup;
+
+ // look up tables for pre-configured instance partition
+ private final List<List<String>> _preConfiguredMirroredServerLists = new
ArrayList<>();
+ private final Map<String, Integer> _preConfiguredInstanceNameToOffsetMap =
new HashMap<>();
+
+ private final List<List<String>> _existingMirroredServerLists = new
ArrayList<>();
+
+ public
MirrorServerSetInstancePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions,
+ InstancePartitions preConfiguredInstancePartitions) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions);
+ _preConfiguredInstancePartitions = preConfiguredInstancePartitions;
+ _numTargetInstancesPerReplicaGroup =
_replicaGroupPartitionConfig.getNumInstancesPerReplicaGroup();
+ _numTargetReplicaGroups =
_replicaGroupPartitionConfig.getNumReplicaGroups();
+ _numTargetTotalInstances = _numTargetInstancesPerReplicaGroup *
_numTargetReplicaGroups;
+ }
+
+ /**
+ * validate if the poolToInstanceConfigsMap is a valid input for
pre-configuration based replica-group selection
+ */
+ private void validatePoolDiversePreconditions(Map<Integer,
List<InstanceConfig>> poolToInstanceConfigsMap) {
+
+ LOGGER.info("Validating pre-configured instance partitions for
pre-configuration based replica-group selection");
+
+ // numTargetInstancesPerReplica should be positive
+ Preconditions.checkState(_numTargetInstancesPerReplicaGroup > 0,
+ "Number of instances per replica must be positive");
+ LOGGER.info("Number of instances per replica: {}",
_numTargetInstancesPerReplicaGroup);
+ // _numTargetReplicaGroups should be positive
+ Preconditions.checkState(_numTargetReplicaGroups > 0, "Number of
replica-groups must be positive");
+ LOGGER.info("Number of replica-groups: {}", _numTargetReplicaGroups);
+ // validate target partition count is 1
+ Preconditions.checkState(_replicaGroupPartitionConfig.getNumPartitions()
<= 1,
+ "This algorithm does not support table level partitioning for target
assignment");
+ LOGGER.info("Number of partitions: {}",
_replicaGroupPartitionConfig.getNumPartitions());
+
+ // Validate the existing instance partitions is null or has only one
partition
+ Preconditions.checkState(
+ (_existingInstancePartitions == null ||
_existingInstancePartitions.getNumPartitions() == 1),
+ "This algorithm does not support table level partitioning for existing
assignment");
+ LOGGER.info("Number of partitions in existing instance partitions: {}",
_existingInstancePartitions == null ? 0
+ : _existingInstancePartitions.getNumPartitions());
+
+ _numExistingReplicaGroups =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getNumReplicaGroups();
+ _numExistingInstancesPerReplicaGroup =
+ _existingInstancePartitions == null ? 0 :
_existingInstancePartitions.getInstances(0, 0).size();
+
+ // Validate the pre-configured instance partitions is not null and has
only one partition
+ Preconditions.checkState(_preConfiguredInstancePartitions != null,
+ "Pre-configured instance partitions must be provided for
pre-configuration based selection");
+
Preconditions.checkState(_preConfiguredInstancePartitions.getNumPartitions() ==
1,
+ "This algorithm does not support table level partitioning for
pre-configured assignment");
+ LOGGER.info("Number of partitions in pre-configured instance partitions:
{}", _preConfiguredInstancePartitions
+ .getNumPartitions());
+
+ // Validate the number of replica-groups in the pre-configured instance
partitions is equal to the target
+ // number of replica-groups
+ _numPreConfiguredReplicaGroups =
_preConfiguredInstancePartitions.getNumReplicaGroups();
+ Preconditions.checkState(_numPreConfiguredReplicaGroups ==
_numTargetReplicaGroups,
+ "The number of replica-groups %s in the pre-configured instance
partitions "
+ + "is not equal to the target number of replica-groups %s",
_numPreConfiguredReplicaGroups,
+ _numTargetReplicaGroups);
+ LOGGER.info("Number of replica-groups in pre-configured instance
partitions: {}", _numPreConfiguredReplicaGroups);
+
+ // Validate the number of instances per replica-group in the
pre-configured instance partitions is greater than or
+ // equal to the target number of instances per replica-group
+ _numPreConfiguredInstancesPerReplicaGroup =
_preConfiguredInstancePartitions.getInstances(0, 0).size();
+ Preconditions.checkState(_numPreConfiguredInstancesPerReplicaGroup >=
_numTargetInstancesPerReplicaGroup,
+ "The number of instances per replica-group in the pre-configured "
+ + "instance partitions is less than the target number of instances
per replica-group");
+ LOGGER.info("Number of instances per replica-group in pre-configured
instance partitions: {}",
+ _numPreConfiguredInstancesPerReplicaGroup);
+
+ // Validate the pool to instance configs map is not null or empty
+ Preconditions.checkNotNull(poolToInstanceConfigsMap,
"poolToInstanceConfigsMap is null");
+ int numPools = poolToInstanceConfigsMap.size();
+ Preconditions.checkState(numPools > 0, "No pool qualified for selection");
+
Preconditions.checkState(poolToInstanceConfigsMap.values().stream().map(List::size).reduce(Integer::sum)
+ .orElse(0) >= _numTargetTotalInstances,
+ "The total number of instances in all pools is less than the target
number of target instances");
+
+ HashSet<String> availableInstanceSet = new HashSet<>();
+ poolToInstanceConfigsMap.values().forEach(list -> list.forEach(i ->
availableInstanceSet.add(i.getInstanceName())));
+ LOGGER.info("Number of pools: {}", numPools);
+ LOGGER.info("Number of instances in all pools: {}",
availableInstanceSet.size());
+ LOGGER.info("availableInstanceSet: {}", availableInstanceSet);
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ List<String> instances =
_preConfiguredInstancePartitions.getInstances(0, i);
+ for (String instance : instances) {
+ Preconditions.checkState(availableInstanceSet.contains(instance),
+ "Instance %s in pre-configured instance partitions is not in "
+ + "the pool to instance configs map",
+ instance);
+ }
+ }
+
+ LOGGER.info("Validation passed. The instances provided can satisfy the
pool diverse requirement.");
+ LOGGER.info("Trying to assign total {} instances to {} replica groups, " +
"with {} instance per replica group",
+ _numTargetTotalInstances, _numTargetReplicaGroups,
_numTargetInstancesPerReplicaGroup);
+ }
+
+ void createListFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ List<String> mirroredServerList = new ArrayList<>();
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ mirroredServerList.add(preConfiguredReplicaGroups.get(i).get(j));
+ }
+ _preConfiguredMirroredServerLists.add(mirroredServerList);
+ }
+ }
+
+ void createLookupTablesFromPreConfiguredInstanceAssignmentMap() {
+ List<List<String>> preConfiguredReplicaGroups = new
ArrayList<>(_numPreConfiguredReplicaGroups);
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+
preConfiguredReplicaGroups.add(_preConfiguredInstancePartitions.getInstances(0,
i));
+ }
+
+ for (int i = 0; i < _numPreConfiguredReplicaGroups; i++) {
+ for (int j = 0; j < _numPreConfiguredInstancesPerReplicaGroup; j++) {
+ String instance = preConfiguredReplicaGroups.get(i).get(j);
+ _preConfiguredInstanceNameToOffsetMap.put(instance, j);
+ }
+ }
+ }
+
+ @Override
+ public void selectInstances(Map<Integer, List<InstanceConfig>>
poolToInstanceConfigsMap,
+ InstancePartitions instancePartitions) {
+ if (_replicaGroupPartitionConfig.isReplicaGroupBased()) {
+ validatePoolDiversePreconditions(poolToInstanceConfigsMap);
+ if (_existingInstancePartitions == null) {
Review Comment:
Perhaps create separate helper functions for new assignment vs
uplift/downlift cases?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]