This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 95b075b527 Batch reload api to specify what segments to be reloaded on
what servers to be more flexible (#14544)
95b075b527 is described below
commit 95b075b52765088e1a27cc420f49a468bcd789bb
Author: Xiaobing <[email protected]>
AuthorDate: Thu Dec 5 06:57:25 2024 -0800
Batch reload api to specify what segments to be reloaded on what servers to
be more flexible (#14544)
* extend existing reload all segments API to make it more flexible, by
taking a map to reload different batch of segments on different instances
---
.../api/resources/PinotSegmentRestletResource.java | 129 +++++++++++++++++----
.../helix/core/PinotHelixResourceManager.java | 93 +++++++++++----
.../resources/PinotSegmentRestletResourceTest.java | 88 ++++++++++++++
.../segment/spi/creator/name/SegmentNameUtils.java | 2 +
.../api/resources/ControllerJobStatusResource.java | 53 ++++-----
.../apache/pinot/spi/utils/CommonConstants.java | 1 +
6 files changed, 290 insertions(+), 76 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 7499098780..37e365bc7f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -18,8 +18,9 @@
*/
package org.apache.pinot.controller.api.resources;
-import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.collect.BiMap;
@@ -39,6 +40,7 @@ import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
@@ -87,6 +89,7 @@ import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.core.auth.Actions;
import org.apache.pinot.core.auth.Authorize;
import org.apache.pinot.core.auth.TargetType;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.data.DateTimeFieldSpec;
@@ -403,8 +406,8 @@ public class PinotSegmentRestletResource {
int numReloadMsgSent = msgInfo.getLeft();
if (numReloadMsgSent > 0) {
try {
- if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentName, msgInfo.getRight(),
- startTimeMs, numReloadMsgSent)) {
+ if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentName, targetInstance,
+ msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
zkJobMetaWriteSuccess = true;
} else {
LOGGER.error("Failed to add reload segment job meta into zookeeper
for table: {}, segment: {}",
@@ -533,20 +536,11 @@ public class PinotSegmentRestletResource {
}
String tableNameWithType =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE);
- Map<String, List<String>> serverToSegments;
-
- String singleSegmentName =
+ String segmentNames =
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME);
- if (singleSegmentName != null) {
- // No need to query servers where this segment is not supposed to be
hosted
- serverToSegments = new TreeMap<>();
- List<String> segmentList = Collections.singletonList(singleSegmentName);
- _pinotHelixResourceManager.getServers(tableNameWithType,
singleSegmentName).forEach(server -> {
- serverToSegments.put(server, segmentList);
- });
- } else {
- serverToSegments =
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
- }
+ String instanceName =
+
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME);
+ Map<String, List<String>> serverToSegments =
getServerToSegments(tableNameWithType, segmentNames, instanceName);
BiMap<String, String> serverEndPoints =
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegments.keySet());
@@ -554,13 +548,16 @@ public class PinotSegmentRestletResource {
new CompletionServiceHelper(_executor, _connectionManager,
serverEndPoints);
List<String> serverUrls = new ArrayList<>();
- BiMap<String, String> endpointsToServers = serverEndPoints.inverse();
- for (String endpoint : endpointsToServers.keySet()) {
+ for (Map.Entry<String, String> entry : serverEndPoints.entrySet()) {
+ String server = entry.getKey();
+ String endpoint = entry.getValue();
String reloadTaskStatusEndpoint =
endpoint + "/controllerJob/reloadStatus/" + tableNameWithType +
"?reloadJobTimestamp="
+
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
- if (singleSegmentName != null) {
- reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName="
+ singleSegmentName;
+ if (segmentNames != null) {
+ List<String> targetSegments = serverToSegments.get(server);
+ reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName="
+ StringUtils.join(targetSegments,
+ SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
}
serverUrls.add(reloadTaskStatusEndpoint);
}
@@ -615,6 +612,31 @@ public class PinotSegmentRestletResource {
return serverReloadControllerJobStatusResponse;
}
+ @VisibleForTesting
+ Map<String, List<String>> getServerToSegments(String tableNameWithType,
@Nullable String segmentNames,
+ @Nullable String instanceName) {
+ if (segmentNames == null) {
+ // instanceName can be null or not null, and this method below can
handle both cases.
+ return
_pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType,
instanceName);
+ }
+ // Skip servers and segments not involved in the segment reloading job.
+ List<String> segmnetNameList = new ArrayList<>();
+ Collections.addAll(segmnetNameList, StringUtils.split(segmentNames,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+ if (instanceName != null) {
+ return Map.of(instanceName, segmnetNameList);
+ }
+ // If instance is null, then either one or all segments are being reloaded
via current segment reload restful APIs.
+ // And the if-check at the beginning of this method has handled the case
of reloading all segments. So here we
+ // expect only one segment name.
+ Preconditions.checkState(segmnetNameList.size() == 1, "Only one segment is
expected but got: %s", segmnetNameList);
+ Map<String, List<String>> serverToSegments = new HashMap<>();
+ Set<String> servers =
_pinotHelixResourceManager.getServers(tableNameWithType, segmentNames);
+ for (String server : servers) {
+ serverToSegments.put(server, Collections.singletonList(segmentNames));
+ }
+ return serverToSegments;
+ }
+
@POST
@Path("segments/{tableName}/reload")
@Authorize(targetType = TargetType.TABLE, paramName = "tableName", action =
Actions.Table.RELOAD_SEGMENT)
@@ -627,10 +649,11 @@ public class PinotSegmentRestletResource {
@ApiParam(value = "Whether to force server to download segment")
@QueryParam("forceDownload")
@DefaultValue("false") boolean forceDownload,
@ApiParam(value = "Name of the target instance to reload")
@QueryParam("targetInstance") @Nullable
- String targetInstance, @Context HttpHeaders headers)
- throws JsonProcessingException {
+ String targetInstance,
+ @ApiParam(value = "Map from instances to segments to reload. This param
takes precedence over targetInstance")
+ @QueryParam("instanceToSegmentsMap") @Nullable String
instanceToSegmentsMapInJson, @Context HttpHeaders headers)
+ throws IOException {
tableName = DatabaseUtils.translateTableName(tableName, headers);
- long startTimeMs = System.currentTimeMillis();
TableType tableTypeFromTableName =
TableNameBuilder.getTableTypeFromTableName(tableName);
TableType tableTypeFromRequest = Constants.validateTableType(tableTypeStr);
// When rawTableName is provided but w/o table type, Pinot tries to reload
both OFFLINE
@@ -644,6 +667,20 @@ public class PinotSegmentRestletResource {
List<String> tableNamesWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableTypeFromRequest,
LOGGER);
+ if (instanceToSegmentsMapInJson != null) {
+ Map<String, List<String>> instanceToSegmentsMap =
+ JsonUtils.stringToObject(instanceToSegmentsMapInJson, new
TypeReference<>() {
+ });
+ Map<String, Map<String, Map<String, String>>> tableInstanceMsgData =
+ reloadSegments(tableNamesWithType, forceDownload,
instanceToSegmentsMap);
+ if (tableInstanceMsgData.isEmpty()) {
+ throw new ControllerApplicationException(LOGGER,
+ String.format("Failed to find any segments in table: %s with
instanceToSegmentsMap: %s", tableName,
+ instanceToSegmentsMap), Status.NOT_FOUND);
+ }
+ return new
SuccessResponse(JsonUtils.objectToString(tableInstanceMsgData));
+ }
+ long startTimeMs = System.currentTimeMillis();
Map<String, Map<String, String>> perTableMsgData = new LinkedHashMap<>();
for (String tableNameWithType : tableNamesWithType) {
Pair<Integer, String> msgInfo =
@@ -658,8 +695,8 @@ public class PinotSegmentRestletResource {
perTableMsgData.put(tableNameWithType, tableReloadMeta);
// Store in ZK
try {
- if
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType,
msgInfo.getRight(), startTimeMs,
- numReloadMsgSent)) {
+ if
(_pinotHelixResourceManager.addNewReloadAllSegmentsJob(tableNameWithType,
targetInstance, msgInfo.getRight(),
+ startTimeMs, numReloadMsgSent)) {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
} else {
tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
@@ -678,6 +715,48 @@ public class PinotSegmentRestletResource {
return new SuccessResponse(JsonUtils.objectToString(perTableMsgData));
}
+ private Map<String, Map<String, Map<String, String>>>
reloadSegments(List<String> tableNamesWithType,
+ boolean forceDownload, Map<String, List<String>> instanceToSegmentsMap) {
+ long startTimeMs = System.currentTimeMillis();
+ Map<String, Map<String, Map<String, String>>> tableInstanceMsgData = new
LinkedHashMap<>();
+ for (String tableNameWithType : tableNamesWithType) {
+ Map<String, Pair<Integer, String>> instanceMsgInfoMap =
+ _pinotHelixResourceManager.reloadSegments(tableNameWithType,
forceDownload, instanceToSegmentsMap);
+ Map<String, Map<String, String>> instanceMsgData =
+ tableInstanceMsgData.computeIfAbsent(tableNameWithType, t -> new
HashMap<>());
+ for (Map.Entry<String, Pair<Integer, String>> instanceMsgInfo :
instanceMsgInfoMap.entrySet()) {
+ String instance = instanceMsgInfo.getKey();
+ Pair<Integer, String> msgInfo = instanceMsgInfo.getValue();
+ int numReloadMsgSent = msgInfo.getLeft();
+ if (numReloadMsgSent <= 0) {
+ continue;
+ }
+ Map<String, String> tableReloadMeta = new HashMap<>();
+ tableReloadMeta.put("numMessagesSent",
String.valueOf(numReloadMsgSent));
+ tableReloadMeta.put("reloadJobId", msgInfo.getRight());
+ instanceMsgData.put(instance, tableReloadMeta);
+ // Store in ZK
+ try {
+ String segmentNames =
+ StringUtils.join(instanceToSegmentsMap.get(instance),
SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
+ if
(_pinotHelixResourceManager.addNewReloadSegmentJob(tableNameWithType,
segmentNames, instance,
+ msgInfo.getRight(), startTimeMs, numReloadMsgSent)) {
+ tableReloadMeta.put("reloadJobMetaZKStorageStatus", "SUCCESS");
+ } else {
+ tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+ LOGGER.error("Failed to add batch reload job meta into zookeeper
for table: {} targeted instance: {}",
+ tableNameWithType, instance);
+ }
+ } catch (Exception e) {
+ tableReloadMeta.put("reloadJobMetaZKStorageStatus", "FAILED");
+ LOGGER.error("Failed to add batch reload job meta into zookeeper for
table: {} targeted instance: {}",
+ tableNameWithType, instance, e);
+ }
+ }
+ }
+ return tableInstanceMsgData;
+ }
+
@DELETE
@Produces(MediaType.APPLICATION_JSON)
@Path("/segments/{tableName}/{segmentName}")
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1f1e95877f..42bd8c4ac4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -2193,56 +2193,65 @@ public class PinotHelixResourceManager {
/**
* Adds a new reload segment job metadata into ZK
* @param tableNameWithType Table for which job is to be added
- * @param segmentName Name of the segment being reloaded
+ * @param segmentNames Name of the segments being reloaded, separated by
comma
+ * @param instanceName Name of the instance done the segment reloading,
optional.
* @param jobId job's UUID
* @param jobSubmissionTimeMs time at which the job was submitted
* @param numMessagesSent number of messages that were sent to servers.
Saved as metadata
* @return boolean representing success / failure of the ZK write step
*/
- public boolean addNewReloadSegmentJob(String tableNameWithType, String
segmentName, String jobId,
- long jobSubmissionTimeMs, int numMessagesSent) {
+ public boolean addNewReloadSegmentJob(String tableNameWithType, String
segmentNames, @Nullable String instanceName,
+ String jobId, long jobSubmissionTimeMs, int numMessagesSent) {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_SEGMENT);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numMessagesSent));
-
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME,
segmentName);
+
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_SEGMENT_NAME,
segmentNames);
+ if (instanceName != null) {
+
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME,
instanceName);
+ }
return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.RELOAD_SEGMENT);
}
- public boolean addNewForceCommitJob(String tableNameWithType, String jobId,
long jobSubmissionTimeMs,
- Set<String> consumingSegmentsCommitted)
- throws JsonProcessingException {
- Map<String, String> jobMetadata = new HashMap<>();
- jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
- jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
- jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.FORCE_COMMIT);
- jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
-
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
- JsonUtils.objectToString(consumingSegmentsCommitted));
- return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.FORCE_COMMIT);
- }
-
/**
* Adds a new reload segment job metadata into ZK
* @param tableNameWithType Table for which job is to be added
+ * @param instanceName Name of the instance done the segment reloading,
optional.
* @param jobId job's UUID
* @param jobSubmissionTimeMs time at which the job was submitted
* @param numberOfMessagesSent number of messages that were sent to servers.
Saved as metadata
* @return boolean representing success / failure of the ZK write step
*/
- public boolean addNewReloadAllSegmentsJob(String tableNameWithType, String
jobId, long jobSubmissionTimeMs,
- int numberOfMessagesSent) {
+ public boolean addNewReloadAllSegmentsJob(String tableNameWithType,
@Nullable String instanceName, String jobId,
+ long jobSubmissionTimeMs, int numberOfMessagesSent) {
Map<String, String> jobMetadata = new HashMap<>();
jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.RELOAD_SEGMENT);
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.MESSAGE_COUNT,
Integer.toString(numberOfMessagesSent));
+ if (instanceName != null) {
+
jobMetadata.put(CommonConstants.ControllerJob.SEGMENT_RELOAD_JOB_INSTANCE_NAME,
instanceName);
+ }
return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.RELOAD_SEGMENT);
}
+
+ public boolean addNewForceCommitJob(String tableNameWithType, String jobId,
long jobSubmissionTimeMs,
+ Set<String> consumingSegmentsCommitted)
+ throws JsonProcessingException {
+ Map<String, String> jobMetadata = new HashMap<>();
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+ jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.FORCE_COMMIT);
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
+
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
+ JsonUtils.objectToString(consumingSegmentsCommitted));
+ return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.FORCE_COMMIT);
+ }
+
/**
* Adds a new job metadata for controller job like table rebalance or reload
into ZK
* @param jobId job's UUID
@@ -2605,6 +2614,42 @@ public class PinotHelixResourceManager {
sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true);
}
+ public Map<String, Pair<Integer, String>> reloadSegments(String
tableNameWithType, boolean forceDownload,
+ Map<String, List<String>> instanceToSegmentsMap) {
+ LOGGER.info("Sending reload messages for table: {} with forceDownload: {},
and instanceToSegmentsMap: {}",
+ tableNameWithType, forceDownload, instanceToSegmentsMap);
+
+ if (forceDownload) {
+ TableType tt =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
+ // TODO: support to force download immutable segments from RealTime
table.
+ Preconditions.checkArgument(tt == TableType.OFFLINE,
+ "Table: %s is not an OFFLINE table, which is required to force to
download segments", tableNameWithType);
+ }
+ // Infinite timeout on the recipient
+ int timeoutMs = -1;
+ Map<String, Pair<Integer, String>> instanceMsgInfoMap = new HashMap<>();
+ for (Map.Entry<String, List<String>> entry :
instanceToSegmentsMap.entrySet()) {
+ String targetInstance = entry.getKey();
+ List<String> segments = entry.getValue();
+ Criteria recipientCriteria = new Criteria();
+ recipientCriteria.setRecipientInstanceType(InstanceType.PARTICIPANT);
+ recipientCriteria.setInstanceName(targetInstance);
+ recipientCriteria.setResource(tableNameWithType);
+ recipientCriteria.setSessionSpecific(true);
+ SegmentReloadMessage segmentReloadMessage = new
SegmentReloadMessage(tableNameWithType, segments, forceDownload);
+ ClusterMessagingService messagingService =
_helixZkManager.getMessagingService();
+ int numMessagesSent = messagingService.send(recipientCriteria,
segmentReloadMessage, null, timeoutMs);
+ if (numMessagesSent > 0) {
+ LOGGER.info("Sent {} reload messages to instance: {} for table: {}",
numMessagesSent, targetInstance,
+ tableNameWithType);
+ } else {
+ LOGGER.warn("No reload message sent to instance: {} for table: {}",
targetInstance, tableNameWithType);
+ }
+ instanceMsgInfoMap.put(targetInstance, Pair.of(numMessagesSent,
segmentReloadMessage.getMsgId()));
+ }
+ return instanceMsgInfoMap;
+ }
+
public Pair<Integer, String> reloadAllSegments(String tableNameWithType,
boolean forceDownload,
@Nullable String targetInstance) {
LOGGER.info("Sending reload message for table: {} with forceDownload: {},
and target: {}", tableNameWithType,
@@ -2985,6 +3030,10 @@ public class PinotHelixResourceManager {
* the ideal state because they are not supposed to be served.
*/
public Map<String, List<String>> getServerToSegmentsMap(String
tableNameWithType) {
+ return getServerToSegmentsMap(tableNameWithType, null);
+ }
+
+ public Map<String, List<String>> getServerToSegmentsMap(String
tableNameWithType, @Nullable String targetServer) {
Map<String, List<String>> serverToSegmentsMap = new TreeMap<>();
IdealState idealState =
_helixAdmin.getResourceIdealState(_helixClusterName, tableNameWithType);
if (idealState == null) {
@@ -2993,8 +3042,12 @@ public class PinotHelixResourceManager {
for (Map.Entry<String, Map<String, String>> entry :
idealState.getRecord().getMapFields().entrySet()) {
String segmentName = entry.getKey();
for (Map.Entry<String, String> instanceStateEntry :
entry.getValue().entrySet()) {
+ String server = instanceStateEntry.getKey();
+ if (targetServer != null && !server.equals(targetServer)) {
+ continue;
+ }
if (!instanceStateEntry.getValue().equals(SegmentStateModel.OFFLINE)) {
- serverToSegmentsMap.computeIfAbsent(instanceStateEntry.getKey(), key
-> new ArrayList<>()).add(segmentName);
+ serverToSegmentsMap.computeIfAbsent(server, key -> new
ArrayList<>()).add(segmentName);
}
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
new file mode 100644
index 0000000000..392fc05bd8
--- /dev/null
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResourceTest.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.api.resources;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.mockito.InjectMocks;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+
+
+public class PinotSegmentRestletResourceTest {
+ @Mock
+ PinotHelixResourceManager _pinotHelixResourceManager;
+
+ @InjectMocks
+ PinotSegmentRestletResource _pinotSegmentRestletResource;
+
+ @BeforeMethod
+ public void setup() {
+ MockitoAnnotations.openMocks(this);
+ }
+
+ @Test
+ public void testGetServerToSegments() {
+ String tableName = "testTable";
+ Map<String, List<String>> fullServerToSegmentsMap = new HashMap<>();
+ fullServerToSegmentsMap.put("svr01", new ArrayList<>(List.of("seg01",
"seg02")));
+ fullServerToSegmentsMap.put("svr02", new ArrayList<>(List.of("seg02",
"seg03")));
+ fullServerToSegmentsMap.put("svr03", new ArrayList<>(List.of("seg03",
"seg01")));
+ when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName,
null)).thenReturn(fullServerToSegmentsMap);
+ when(_pinotHelixResourceManager.getServerToSegmentsMap(tableName,
"svr02")).thenReturn(
+ Map.of("svr02", new ArrayList<>(List.of("seg02", "seg03"))));
+ when(_pinotHelixResourceManager.getServers(tableName,
"seg01")).thenReturn(Set.of("svr01", "svr03"));
+
+ // Get all servers and all their segments.
+ Map<String, List<String>> serverToSegmentsMap =
+ _pinotSegmentRestletResource.getServerToSegments(tableName, null,
null);
+ assertEquals(serverToSegmentsMap, fullServerToSegmentsMap);
+
+ // Get all segments on svr02.
+ serverToSegmentsMap =
_pinotSegmentRestletResource.getServerToSegments(tableName, null, "svr02");
+ assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg02",
"seg03")));
+
+ // Get all servers with seg01.
+ serverToSegmentsMap =
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01", null);
+ assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01"),
"svr03", List.of("seg01")));
+
+ // Simply map the provided server to the provided segments.
+ serverToSegmentsMap =
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01", "svr01");
+ assertEquals(serverToSegmentsMap, Map.of("svr01", List.of("seg01")));
+ serverToSegmentsMap =
_pinotSegmentRestletResource.getServerToSegments(tableName, "anySegment",
"anyServer");
+ assertEquals(serverToSegmentsMap, Map.of("anyServer",
List.of("anySegment")));
+ serverToSegmentsMap =
_pinotSegmentRestletResource.getServerToSegments(tableName, "seg01|seg02",
"svr02");
+ assertEquals(serverToSegmentsMap, Map.of("svr02", List.of("seg01",
"seg02")));
+ try {
+ _pinotSegmentRestletResource.getServerToSegments(tableName,
"seg01,seg02", null);
+ } catch (Exception e) {
+ assertTrue(e.getMessage().contains("Only one segment is expected but
got: [seg01, seg02]"));
+ }
+ }
+}
diff --git
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java
index f3000c994a..fcd2ec55da 100644
---
a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java
+++
b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/creator/name/SegmentNameUtils.java
@@ -25,6 +25,8 @@ import java.util.regex.Pattern;
* Utils for segment names.
*/
public class SegmentNameUtils {
+ // According to the invalid name pattern below, `|` is safer than `,` as the
segment name separator.
+ public static final char SEGMENT_NAME_SEPARATOR = '|';
private static final Pattern INVALID_SEGMENT_NAME_REGEX =
Pattern.compile(".*[\\\\/:\\*?\"<>|].*");
private SegmentNameUtils() {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
index 66ebdd88cc..9bd96c0da9 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/ControllerJobStatusResource.java
@@ -20,6 +20,8 @@ package org.apache.pinot.server.api.resources;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import javax.inject.Inject;
import javax.ws.rs.GET;
@@ -30,9 +32,11 @@ import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
+import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.common.utils.DatabaseUtils;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.spi.creator.name.SegmentNameUtils;
import org.apache.pinot.server.starter.ServerInstance;
import org.apache.pinot.server.starter.helix.SegmentReloadStatusValue;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -51,45 +55,32 @@ public class ControllerJobStatusResource {
@ApiOperation(value = "Task status", notes = "Return the status of a given
reload job")
public String reloadJobStatus(@PathParam("tableNameWithType") String
tableNameWithType,
@QueryParam("reloadJobTimestamp") long reloadJobSubmissionTimestamp,
- @QueryParam("segmentName") String segmentName,
- @Context HttpHeaders headers)
+ @QueryParam("segmentName") String segmentName, @Context HttpHeaders
headers)
throws Exception {
tableNameWithType = DatabaseUtils.translateTableName(tableNameWithType,
headers);
TableDataManager tableDataManager =
ServerResourceUtils.checkGetTableDataManager(_serverInstance,
tableNameWithType);
-
+ List<SegmentDataManager> segmentDataManagers;
+ long totalSegmentCount;
if (segmentName == null) {
- // All segments
- List<SegmentDataManager> allSegments =
tableDataManager.acquireAllSegments();
- try {
- long successCount = 0;
- for (SegmentDataManager segmentDataManager : allSegments) {
- if (segmentDataManager.getLoadTimeMs() >=
reloadJobSubmissionTimestamp) {
- successCount++;
- }
- }
- SegmentReloadStatusValue segmentReloadStatusValue =
- new SegmentReloadStatusValue(allSegments.size(), successCount);
- return JsonUtils.objectToString(segmentReloadStatusValue);
- } finally {
- for (SegmentDataManager segmentDataManager : allSegments) {
- tableDataManager.releaseSegment(segmentDataManager);
- }
- }
+ segmentDataManagers = tableDataManager.acquireAllSegments();
+ totalSegmentCount = segmentDataManagers.size();
} else {
- SegmentDataManager segmentDataManager =
tableDataManager.acquireSegment(segmentName);
- if (segmentDataManager == null) {
- return JsonUtils.objectToString(new SegmentReloadStatusValue(0, 0));
- }
- try {
- int successCount = 0;
+ List<String> targetSegments = new ArrayList<>();
+ Collections.addAll(targetSegments, StringUtils.split(segmentName,
SegmentNameUtils.SEGMENT_NAME_SEPARATOR));
+ segmentDataManagers = tableDataManager.acquireSegments(targetSegments,
new ArrayList<>());
+ totalSegmentCount = targetSegments.size();
+ }
+ try {
+ long successCount = 0;
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
if (segmentDataManager.getLoadTimeMs() >=
reloadJobSubmissionTimestamp) {
- successCount = 1;
+ successCount++;
}
- SegmentReloadStatusValue segmentReloadStatusValue =
- new SegmentReloadStatusValue(1, successCount);
- return JsonUtils.objectToString(segmentReloadStatusValue);
- } finally {
+ }
+ return JsonUtils.objectToString(new
SegmentReloadStatusValue(totalSegmentCount, successCount));
+ } finally {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
tableDataManager.releaseSegment(segmentDataManager);
}
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 89046dec81..06c7184f4e 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -978,6 +978,7 @@ public class CommonConstants {
* Segment reload job ZK props
*/
public static final String SEGMENT_RELOAD_JOB_SEGMENT_NAME = "segmentName";
+ public static final String SEGMENT_RELOAD_JOB_INSTANCE_NAME =
"instanceName";
// Force commit job ZK props
public static final String CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST =
"segmentsForceCommitted";
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]