somandal commented on code in PR #14250:
URL: https://github.com/apache/pinot/pull/14250#discussion_r2294451175


##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -506,6 +513,19 @@ private String generateStaleSegmentsServerURL(String 
tableNameWithType, String e
     return String.format("%s/tables/%s/segments/isStale", endpoint, 
tableNameWithType);
   }
 
+  private String generateSegmentsParam(Set<String> values) {

Review Comment:
   nit: values -> segments?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java:
##########
@@ -434,14 +434,21 @@ private String 
generateAggregateSegmentMetadataServerURL(String tableNameWithTyp
     return String.format("%s/tables/%s/metadata?%s", endpoint, 
tableNameWithType, paramsStr);
   }
 
-  private String generateSegmentMetadataServerURL(String tableNameWithType, 
String segmentName, List<String> columns,
+  public String generateSegmentMetadataServerURL(String tableNameWithType, 
String segmentName, List<String> columns,
       String endpoint) {
     tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
     segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
     String paramsStr = generateColumnsParam(columns);
     return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint, 
tableNameWithType, segmentName, paramsStr);
   }
 
+  public String generateTableMetadataServerURL(String tableNameWithType, 
List<String> columns,
+      Set<String> segmentsToInclude, String endpoint) {
+    tableNameWithType = URLEncoder.encode(tableNameWithType, 
StandardCharsets.UTF_8);
+    String paramsStr = generateColumnsParam(columns) + 
generateSegmentsParam(segmentsToInclude);

Review Comment:
   has this been tested with a very large number of segments (like 75K+ 
mentioned in the issue)? Are there concerns with the request URL getting too 
big?



##########
pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java:
##########
@@ -409,6 +411,62 @@ public String getSegmentMetadata(
     }
   }
 
+  @GET
+  @Encoded
+  @Path("/tables/{tableName}/segments/metadata")

Review Comment:
   Just wondering if it makes sense to special case the scenario where we want 
to fetch ALL segments (like what's expected in the API call), where we don't 
pass a segment list and instead return all segments? That would keep the params 
short as well 
   
   cc @Jackie-Jiang any thoughts on the above as well?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java:
##########
@@ -127,50 +131,93 @@ private TableReloadJsonResponse 
processSegmentMetadataReloadResponse(
 
   /**
    * This api takes in list of segments for which we need the metadata.
+   * This calls the server to get the metadata for all segments instead of 
making a call per segment.
    */
-  public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> 
columns, Set<String> segmentsToInclude,
-      int timeoutMs)
+  public JsonNode getSegmentsMetadata(String tableNameWithType, @Nullable 
List<String> columns,
+      @Nullable Set<String> segments, int timeoutMs)
       throws InvalidConfigException, IOException {
-    return getSegmentsMetadataInternal(tableNameWithType, columns, 
segmentsToInclude, timeoutMs);
+    return getSegmentsMetadataInternal(tableNameWithType, columns, segments, 
timeoutMs);
   }
 
-  private JsonNode getSegmentsMetadataInternal(String tableNameWithType, 
List<String> columns,
-      Set<String> segmentsToInclude, int timeoutMs)
+  /**
+   * Common helper used by both the new (server-level) and legacy 
(segment-level) endpoints.
+   */
+  private JsonNode fetchAndAggregateMetadata(List<String> urls, BiMap<String, 
String> endpoints, boolean perSegmentJson,
+      String tableNameWithType, int timeoutMs)
       throws InvalidConfigException, IOException {
-    final Map<String, List<String>> serverToSegmentsMap =
-        _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
-    BiMap<String, String> endpoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
-    ServerSegmentMetadataReader serverSegmentMetadataReader =
-        new ServerSegmentMetadataReader(_executor, _connectionManager);
+    CompletionServiceHelper cs = new CompletionServiceHelper(_executor, 
_connectionManager, endpoints);
+    CompletionServiceHelper.CompletionServiceResponse resp =
+        cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson, 
timeoutMs);
+    // all requests will fail if new server endpoint is not available
+    if (resp._failedResponseCount > 0) {
+      throw new RuntimeException("All requests to server instances failed.");
+    }
 
-    // Filter segments that we need
-    for (Map.Entry<String, List<String>> serverToSegment : 
serverToSegmentsMap.entrySet()) {
-      List<String> segments = serverToSegment.getValue();
-      if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
-        segments.retainAll(segmentsToInclude);
+    ObjectMapper mapper = new ObjectMapper();
+    ObjectNode aggregatedNode = mapper.createObjectNode();
+    for (String body : resp._httpResponses.values()) {
+      JsonNode node = JsonUtils.stringToJsonNode(body);
+      // legacy returns one JSON per segment; new returns one JSON with many 
fields
+      if (perSegmentJson) {
+        String segmentName = node.get("segmentName").asText();
+        aggregatedNode.set(segmentName, node);
+      } else {
+        node.fields().forEachRemaining(entry -> 
aggregatedNode.set(entry.getKey(), entry.getValue()));
       }
     }
+    return aggregatedNode;
+  }
 
-    List<String> segmentsMetadata =
-        
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType, 
serverToSegmentsMap, endpoints,
-            columns, timeoutMs);
-    Map<String, JsonNode> response = new HashMap<>();
-    for (String segmentMetadata : segmentsMetadata) {
-      JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
-      response.put(responseJson.get("segmentName").asText(), responseJson);
+  private List<String> buildTableLevelUrls(Map<String, List<String>> 
serverToSegs, BiMap<String, String> endpoints,
+      String tableNameWithType, List<String> columns, Set<String> 
segmentsFilter, ServerSegmentMetadataReader reader) {
+    List<String> urls = new ArrayList<>(serverToSegs.size());
+    for (String server : serverToSegs.keySet()) {
+      urls.add(reader.generateTableMetadataServerURL(
+          tableNameWithType, columns, segmentsFilter, endpoints.get(server)));
     }
-    return JsonUtils.objectToJsonNode(response);
+    return urls;
   }
 
-  /**
-   * This method retrieves the full segment metadata for a given table.
-   * Currently supports only OFFLINE tables.
-   * @return a map of segmentName to its metadata
-   */
-  public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> 
columns, int timeoutMs)
+  private List<String> buildSegmentLevelUrls(Map<String, List<String>> 
serverToSegs, BiMap<String, String> endpoints,
+      String tableNameWithType, List<String> columns, Set<String> 
segmentsFilter, ServerSegmentMetadataReader reader) {
+    List<String> urls = new ArrayList<>();
+    for (Map.Entry<String, List<String>> e : serverToSegs.entrySet()) {
+      for (String segment : e.getValue()) {
+        if (segmentsFilter == null || segmentsFilter.isEmpty()
+            || segmentsFilter.contains(segment)) {
+          urls.add(reader.generateSegmentMetadataServerURL(
+              tableNameWithType, segment, columns, endpoints.get(e.getKey())));
+        }
+      }
+    }
+    return urls;
+  }
+
+  private JsonNode getSegmentsMetadataInternal(String tableNameWithType, 
@Nullable List<String> columns,
+      @Nullable Set<String> segments, int timeoutMs)
       throws InvalidConfigException, IOException {
-    return getSegmentsMetadataInternal(tableNameWithType, columns, null, 
timeoutMs);
+    Map<String, List<String>> serverToSegs =
+        _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+    BiMap<String, String> endpoints =
+        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegs.keySet());
+    ServerSegmentMetadataReader reader =
+        new ServerSegmentMetadataReader(_executor, _connectionManager);
+
+    // try table level endpoint first
+    try {
+      List<String> tableUrls = buildTableLevelUrls(serverToSegs, endpoints,
+          tableNameWithType, columns, segments, reader);
+      return fetchAndAggregateMetadata(tableUrls, endpoints, 
/*perSegmentJson=*/false,
+          tableNameWithType, timeoutMs);
+    } catch (RuntimeException ignore) {
+      // fall through to legacy

Review Comment:
   nit: can a warning be logged here?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java:
##########
@@ -127,50 +131,93 @@ private TableReloadJsonResponse 
processSegmentMetadataReloadResponse(
 
   /**
    * This api takes in list of segments for which we need the metadata.
+   * This calls the server to get the metadata for all segments instead of 
making a call per segment.
    */
-  public JsonNode getSegmentsMetadata(String tableNameWithType, List<String> 
columns, Set<String> segmentsToInclude,
-      int timeoutMs)
+  public JsonNode getSegmentsMetadata(String tableNameWithType, @Nullable 
List<String> columns,
+      @Nullable Set<String> segments, int timeoutMs)
       throws InvalidConfigException, IOException {
-    return getSegmentsMetadataInternal(tableNameWithType, columns, 
segmentsToInclude, timeoutMs);
+    return getSegmentsMetadataInternal(tableNameWithType, columns, segments, 
timeoutMs);
   }
 
-  private JsonNode getSegmentsMetadataInternal(String tableNameWithType, 
List<String> columns,
-      Set<String> segmentsToInclude, int timeoutMs)
+  /**
+   * Common helper used by both the new (server-level) and legacy 
(segment-level) endpoints.
+   */
+  private JsonNode fetchAndAggregateMetadata(List<String> urls, BiMap<String, 
String> endpoints, boolean perSegmentJson,
+      String tableNameWithType, int timeoutMs)
       throws InvalidConfigException, IOException {
-    final Map<String, List<String>> serverToSegmentsMap =
-        _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
-    BiMap<String, String> endpoints =
-        
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
-    ServerSegmentMetadataReader serverSegmentMetadataReader =
-        new ServerSegmentMetadataReader(_executor, _connectionManager);
+    CompletionServiceHelper cs = new CompletionServiceHelper(_executor, 
_connectionManager, endpoints);
+    CompletionServiceHelper.CompletionServiceResponse resp =
+        cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson, 
timeoutMs);
+    // all requests will fail if new server endpoint is not available
+    if (resp._failedResponseCount > 0) {
+      throw new RuntimeException("All requests to server instances failed.");

Review Comment:
   question: is it not possible at all to get partial results, where some 
servers return some data but others fail? or are you treating it as failed if 
even one server fails to return a response?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to