This is an automated email from the ASF dual-hosted git repository.
siddteotia pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 66005a7ab4 Encode Segment Reload Request (#15870)
66005a7ab4 is described below
commit 66005a7ab4077698e5725b58a3e42d4e86c10eb6
Author: Praveen <[email protected]>
AuthorDate: Sun May 25 22:04:19 2025 -0700
Encode Segment Reload Request (#15870)
* Segment Encoding
* review comments
---
.../api/resources/PinotSegmentRestletResource.java | 17 +++++++++++---
.../controller/helix/ControllerRequestClient.java | 7 +++---
.../pinot/controller/helix/ControllerTest.java | 4 ++--
.../tests/OfflineClusterIntegrationTest.java | 26 ++++++++++++++++++++++
4 files changed, 46 insertions(+), 8 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 4c3e1f23d1..7455ac5f4e 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
@@ -558,9 +559,19 @@ public class PinotSegmentRestletResource {
endpoint + "/controllerJob/reloadStatus/" + tableNameWithType +
"?reloadJobTimestamp="
+
controllerJobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS);
if (segmentNames != null) {
- List<String> targetSegments = serverToSegments.get(server);
- reloadTaskStatusEndpoint = reloadTaskStatusEndpoint + "&segmentName="
+ StringUtils.join(targetSegments,
- SegmentNameUtils.SEGMENT_NAME_SEPARATOR);
+ List<String> segmentsForServer = serverToSegments.get(server);
+ StringBuilder encodedSegmentsBuilder = new StringBuilder();
+ if (!segmentsForServer.isEmpty()) {
+ Iterator<String> segmentIterator = segmentsForServer.iterator();
+ // Append first segment without a leading separator
+
encodedSegmentsBuilder.append(URIUtils.encode(segmentIterator.next()));
+ // Append remaining segments, each prefixed by the separator
+ while (segmentIterator.hasNext()) {
+
encodedSegmentsBuilder.append(SegmentNameUtils.SEGMENT_NAME_SEPARATOR)
+
.append(URIUtils.encode(segmentIterator.next()));
+ }
+ }
+ reloadTaskStatusEndpoint += "&segmentName=" + encodedSegmentsBuilder;
}
serverUrls.add(reloadTaskStatusEndpoint);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
index 7aa8f41de6..e619cbf008 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/ControllerRequestClient.java
@@ -241,11 +241,12 @@ public class ControllerRequestClient {
}
}
- public void reloadSegment(String tableName, String segmentName, boolean
forceReload)
+ public String reloadSegment(String tableName, String segmentName, boolean
forceReload)
throws IOException {
try {
- HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(
- new URI(_controllerRequestURLBuilder.forSegmentReload(tableName,
segmentName, forceReload)), null, _headers));
+ SimpleHttpResponse simpleHttpResponse =
HttpClient.wrapAndThrowHttpException(_httpClient.sendJsonPostRequest(
+ new URI(_controllerRequestURLBuilder.forSegmentReload(tableName,
segmentName, forceReload)), null, _headers));
+ return simpleHttpResponse.getResponse();
} catch (HttpErrorStatusException | URISyntaxException e) {
throw new IOException(e);
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 38c29dfeb7..cfd0f3f6cc 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -844,9 +844,9 @@ public class ControllerTest {
return
getControllerRequestClient().checkIfReloadIsNeeded(tableNameWithType, verbose);
}
- public void reloadOfflineSegment(String tableName, String segmentName,
boolean forceDownload)
+ public String reloadOfflineSegment(String tableName, String segmentName,
boolean forceDownload)
throws IOException {
- getControllerRequestClient().reloadSegment(tableName, segmentName,
forceDownload);
+ return getControllerRequestClient().reloadSegment(tableName, segmentName,
forceDownload);
}
public String reloadRealtimeTable(String tableName)
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index f8cd7d67b9..1c87c1eee3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -4060,4 +4060,30 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(result.get("clientRequestId").asText(), clientRequestId);
}
+
+ @Test
+ public void testSegmentReload() {
+ try {
+ // Reload a single segment in the offline table
+ String segmentName = listSegments(DEFAULT_TABLE_NAME +
"_OFFLINE").get(0);
+ String response = reloadOfflineSegment(DEFAULT_TABLE_NAME + "_OFFLINE",
segmentName, true);
+ JsonNode responseJson = JsonUtils.stringToJsonNode(response);
+
+ // Single segment reload response: status is a string, parse manually
+ String statusString = responseJson.get("status").asText();
+ assertTrue(statusString.contains("SUCCESS"), "Segment reload failed: " +
statusString);
+ int startIdx = statusString.indexOf("reload job id:") + "reload job
id:".length();
+ int endIdx = statusString.indexOf(',', startIdx);
+ String jobId = statusString.substring(startIdx, endIdx).trim();
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ return isReloadJobCompleted(jobId);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }, 600_000L, "Reload job did not complete in 10 minutes");
+ } catch (Exception e) {
+ fail("Segment reload failed with exception: " + e.getMessage());
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]