snleee commented on code in PR #12275:
URL: https://github.com/apache/pinot/pull/12275#discussion_r1457116382


##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -180,27 +199,126 @@ public List<String> getSegmentMetadataFromServer(String 
tableNameWithType,
     return segmentsMetadata;
   }
 
+  /**
+   * This method is called when the API request is to fetch validDocId 
metadata for a list segments of the given table.
+   * This method will pick a server that hosts the target segment and fetch 
the segment metadata result.
+   *
+   * @return segment metadata as a JSON string
+   */
+  public List<ValidDocIdMetadataInfo> getValidDocIdMetadataFromServer(String 
tableNameWithType,
+      Map<String, List<String>> serverToSegmentsMap, BiMap<String, String> 
serverToEndpoints,
+      @Nullable List<String> segmentNames, int timeoutMs) {
+    List<Pair<String, String>> serverURLsAndBodies = new ArrayList<>();
+    for (Map.Entry<String, List<String>> serverToSegments : 
serverToSegmentsMap.entrySet()) {
+      List<String> segmentsForServer = serverToSegments.getValue();
+      List<String> segmentsToQuery = new ArrayList<>();
+      for (String segment : segmentsForServer) {
+        if (segmentNames == null) {
+          // If segmentNames is null, query all segments
+          segmentsToQuery.add(segment);
+        } else if (segmentNames.contains(segment)) {
+          segmentsToQuery.add(segment);
+        }
+      }
+      serverURLsAndBodies.add(generateValidDocIdMetadataURL(tableNameWithType, 
segmentsToQuery,
+          serverToEndpoints.get(serverToSegments.getKey())));
+    }
+
+    // request the urls from the servers
+    CompletionServiceHelper completionServiceHelper =
+        new CompletionServiceHelper(_executor, _connectionManager, 
serverToEndpoints);
+
+    Map<String, String> requestHeaders = Map.of("Content-Type", 
"application/json");
+    CompletionServiceHelper.CompletionServiceResponse serviceResponse =
+        completionServiceHelper.doMultiPostRequest(serverURLsAndBodies, 
tableNameWithType, false, requestHeaders,
+            timeoutMs, null);
+
+    List<ValidDocIdMetadataInfo> validDocIdMetadataInfos = new ArrayList<>();
+    int failedParses = 0;
+    int returnedSegmentsCount = 0;
+    for (Map.Entry<String, String> streamResponse : 
serviceResponse._httpResponses.entrySet()) {
+      try {
+        String validDocIdMetadataList = streamResponse.getValue();
+        List<ValidDocIdMetadataInfo> validDocIdMetadataInfo =
+            JsonUtils.stringToObject(validDocIdMetadataList, new 
TypeReference<ArrayList<ValidDocIdMetadataInfo>>() {
+            });
+        validDocIdMetadataInfos.addAll(validDocIdMetadataInfo);
+        returnedSegmentsCount++;
+      } catch (Exception e) {
+        failedParses++;
+        LOGGER.error("Unable to parse server {} response due to an error: ", 
streamResponse.getKey(), e);
+      }
+    }
+    if (failedParses != 0) {
+      LOGGER.error("Unable to parse server {} / {} response due to an error: 
", failedParses,
+          serverURLsAndBodies.size());
+    }
+
+    if (segmentNames != null && returnedSegmentsCount != segmentNames.size()) {
+      LOGGER.error("Unable to get validDocIdMetadata from all servers. 
Expected: {}, Actual: {}", segmentNames.size(),
+          returnedSegmentsCount);
+    }
+    LOGGER.debug("Retrieved segment metadata from servers.");

Review Comment:
   fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to