This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new af3dc6a3b1e Modify segment metadata call (#14250)
af3dc6a3b1e is described below
commit af3dc6a3b1e25b7f5e7a90dfef0ebdc9f249dd0d
Author: Shreyaa Sharma <[email protected]>
AuthorDate: Mon Sep 8 20:57:18 2025 +0530
Modify segment metadata call (#14250)
* Modify segment metadata call
* fix tests
* add per server metadata api
* remove v2 api
* Add v2 API and server test, separate functions
* fix test
* merge getSegmentsMetadataInternalV2 into getSegmentsMetadataInternal
* Address comment and add test
* final fixes
* address comments
* address comments
* update log
---
.../api/resources/PinotSegmentRestletResource.java | 18 ++--
.../util/ServerSegmentMetadataReader.java | 38 ++++---
.../pinot/controller/util/TableMetadataReader.java | 114 +++++++++++++++------
.../tests/BaseClusterIntegrationTest.java | 8 ++
.../tests/HybridClusterIntegrationTest.java | 76 ++++++++++++--
.../pinot/server/api/resources/TablesResource.java | 60 +++++++++++
.../pinot/server/api/TablesResourceTest.java | 44 ++++++++
.../utils/builder/ControllerRequestURLBuilder.java | 37 ++++++-
8 files changed, 333 insertions(+), 62 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
index 379dcfb7da7..33441ed9067 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentRestletResource.java
@@ -926,17 +926,20 @@ public class PinotSegmentRestletResource {
public String getServerMetadata(
@ApiParam(value = "Name of the table", required = true)
@PathParam("tableName") String tableName,
@ApiParam(value = "OFFLINE|REALTIME") @QueryParam("type") String
tableTypeStr,
- @ApiParam(value = "Columns name", allowMultiple = true)
@QueryParam("columns") @DefaultValue("")
- List<String> columns, @Context HttpHeaders headers) {
+ @Encoded @ApiParam(value = "Segments to include (all if not specified)",
allowMultiple = true)
+ @QueryParam("segments") @Nullable List<String> segments,
+ @Encoded @ApiParam(value = "Columns name", allowMultiple = true)
@QueryParam("columns")
+ @Nullable List<String> columns, @Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
- LOGGER.info("Received a request to fetch metadata for all segments for
table {}", tableName);
+ String segmentCount = (segments == null) ? "all" :
String.valueOf(segments.size());
+ LOGGER.info("Received a request to fetch metadata for {} segments for
table {}", segmentCount, tableName);
TableType tableType = Constants.validateTableType(tableTypeStr);
String tableNameWithType =
ResourceUtils.getExistingTableNamesWithType(_pinotHelixResourceManager,
tableName, tableType, LOGGER).get(0);
String segmentsMetadata;
try {
- JsonNode segmentsMetadataJson =
getSegmentsMetadataFromServer(tableNameWithType, columns);
+ JsonNode segmentsMetadataJson =
getSegmentsMetadataFromServer(tableNameWithType, columns, segments);
segmentsMetadata = JsonUtils.objectToPrettyString(segmentsMetadataJson);
} catch (InvalidConfigException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Status.BAD_REQUEST);
@@ -1156,14 +1159,17 @@ public class PinotSegmentRestletResource {
* This is a helper method to get the metadata for all segments for a given
table name.
* @param tableNameWithType name of the table along with its type
* @param columns name of the columns
+ * @param segments name of the segments to include in metadata
* @return Map<String, String> metadata of the table segments -> map of
segment name to its metadata
*/
- private JsonNode getSegmentsMetadataFromServer(String tableNameWithType,
List<String> columns)
+ private JsonNode getSegmentsMetadataFromServer(String tableNameWithType,
@Nullable List<String> columns,
+ @Nullable List<String> segments)
throws InvalidConfigException, IOException {
TableMetadataReader tableMetadataReader =
new TableMetadataReader(_executor, _connectionManager,
_pinotHelixResourceManager);
return tableMetadataReader
- .getSegmentsMetadata(tableNameWithType, columns,
_controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
+ .getSegmentsMetadata(tableNameWithType, columns, segments,
+ _controllerConf.getServerAdminRequestTimeoutSeconds() * 1000);
}
@POST
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
index 0376b90dac7..c0ebfd5313c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/ServerSegmentMetadataReader.java
@@ -39,6 +39,7 @@ import javax.annotation.Nullable;
import javax.ws.rs.client.ClientBuilder;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
@@ -65,6 +66,8 @@ import org.slf4j.LoggerFactory;
*/
public class ServerSegmentMetadataReader {
private static final Logger LOGGER =
LoggerFactory.getLogger(ServerSegmentMetadataReader.class);
+ private static final String COLUMNS_KEY = "columns";
+ private static final String SEGMENTS_KEY = "segments";
private final Executor _executor;
private final HttpClientConnectionManager _connectionManager;
@@ -430,18 +433,25 @@ public class ServerSegmentMetadataReader {
private String generateAggregateSegmentMetadataServerURL(String
tableNameWithType, List<String> columns,
String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
- String paramsStr = generateColumnsParam(columns);
+ String paramsStr = generateParam(COLUMNS_KEY, columns);
return String.format("%s/tables/%s/metadata?%s", endpoint,
tableNameWithType, paramsStr);
}
- private String generateSegmentMetadataServerURL(String tableNameWithType,
String segmentName, List<String> columns,
- String endpoint) {
+ public String generateSegmentMetadataServerURL(String tableNameWithType,
String segmentName,
+ @Nullable List<String> columns, String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
segmentName = URLEncoder.encode(segmentName, StandardCharsets.UTF_8);
- String paramsStr = generateColumnsParam(columns);
+ String paramsStr = generateParam(COLUMNS_KEY, columns);
return String.format("%s/tables/%s/segments/%s/metadata?%s", endpoint,
tableNameWithType, segmentName, paramsStr);
}
+ public String generateTableMetadataServerURL(String tableNameWithType,
@Nullable List<String> columns,
+ @Nullable List<String> segmentsToInclude, String endpoint) {
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ String paramsStr = generateParam(COLUMNS_KEY, columns) + "&" +
generateParam(SEGMENTS_KEY, segmentsToInclude);
+ return String.format("%s/tables/%s/segments/metadata?%s", endpoint,
tableNameWithType, paramsStr);
+ }
+
private String generateCheckReloadSegmentsServerURL(String
tableNameWithType, String endpoint) {
tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
return String.format("%s/tables/%s/segments/needReload", endpoint,
tableNameWithType);
@@ -488,24 +498,24 @@ public class ServerSegmentMetadataReader {
return Pair.of(url, jsonTableSegments);
}
- private String generateColumnsParam(List<String> columns) {
+ private String generateStaleSegmentsServerURL(String tableNameWithType,
String endpoint) {
+ tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
+ return String.format("%s/tables/%s/segments/isStale", endpoint,
tableNameWithType);
+ }
+
+ private String generateParam(String key, List<String> values) {
String paramsStr = "";
- if (columns == null || columns.isEmpty()) {
+ if (CollectionUtils.isEmpty(values)) {
return paramsStr;
}
- List<String> params = new ArrayList<>(columns.size());
- for (String column : columns) {
- params.add(String.format("columns=%s", column));
+ List<String> params = new ArrayList<>(values.size());
+ for (String value : values) {
+ params.add(key + "=" + value);
}
paramsStr = String.join("&", params);
return paramsStr;
}
- private String generateStaleSegmentsServerURL(String tableNameWithType,
String endpoint) {
- tableNameWithType = URLEncoder.encode(tableNameWithType,
StandardCharsets.UTF_8);
- return String.format("%s/tables/%s/segments/isStale", endpoint,
tableNameWithType);
- }
-
public class TableReloadResponse {
private int _numFailedResponses;
private List<String> _serverReloadResponses;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index 26958034f27..71e3e4194ba 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -19,8 +19,11 @@
package org.apache.pinot.controller.util;
import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.BiMap;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -29,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
+import javax.annotation.Nullable;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.exception.InvalidConfigException;
@@ -40,6 +44,8 @@ import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
@@ -50,6 +56,7 @@ import org.apache.pinot.spi.utils.builder.TableNameBuilder;
* the column indexes available.
*/
public class TableMetadataReader {
+ private static final Logger log =
LoggerFactory.getLogger(TableMetadataReader.class);
private final Executor _executor;
private final HttpClientConnectionManager _connectionManager;
private final PinotHelixResourceManager _pinotHelixResourceManager;
@@ -127,50 +134,95 @@ public class TableMetadataReader {
/**
* This api takes in list of segments for which we need the metadata.
+ * This calls the server to get the metadata for all segments instead of
making a call per segment.
*/
- public JsonNode getSegmentsMetadata(String tableNameWithType, List<String>
columns, Set<String> segmentsToInclude,
- int timeoutMs)
+ public JsonNode getSegmentsMetadata(String tableNameWithType, @Nullable
List<String> columns,
+ @Nullable List<String> segments, int timeoutMs)
throws InvalidConfigException, IOException {
- return getSegmentsMetadataInternal(tableNameWithType, columns,
segmentsToInclude, timeoutMs);
+ return getSegmentsMetadataInternal(tableNameWithType, columns, segments,
timeoutMs);
}
- private JsonNode getSegmentsMetadataInternal(String tableNameWithType,
List<String> columns,
- Set<String> segmentsToInclude, int timeoutMs)
+ /**
+ * Common helper used by both the new (server-level) and legacy
(segment-level) endpoints.
+ */
+ private JsonNode fetchAndAggregateMetadata(List<String> urls, BiMap<String,
String> endpoints, boolean perSegmentJson,
+ String tableNameWithType, int timeoutMs)
throws InvalidConfigException, IOException {
- final Map<String, List<String>> serverToSegmentsMap =
- _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
- BiMap<String, String> endpoints =
-
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegmentsMap.keySet());
- ServerSegmentMetadataReader serverSegmentMetadataReader =
- new ServerSegmentMetadataReader(_executor, _connectionManager);
+ CompletionServiceHelper cs = new CompletionServiceHelper(_executor,
_connectionManager, endpoints);
+ CompletionServiceHelper.CompletionServiceResponse resp =
+ cs.doMultiGetRequest(urls, tableNameWithType, perSegmentJson,
timeoutMs);
+ // all requests will fail if new server endpoint is not available
+ if (resp._failedResponseCount > 0) {
+ throw new RuntimeException(String.format("Got %d failed responses from
total %d server instances. "
+ + "Falling back to legacy segment metadata api",
resp._failedResponseCount, urls.size()));
+ }
- // Filter segments that we need
- for (Map.Entry<String, List<String>> serverToSegment :
serverToSegmentsMap.entrySet()) {
- List<String> segments = serverToSegment.getValue();
- if (segmentsToInclude != null && !segmentsToInclude.isEmpty()) {
- segments.retainAll(segmentsToInclude);
+ ObjectMapper mapper = new ObjectMapper();
+ ObjectNode aggregatedNode = mapper.createObjectNode();
+ for (String body : resp._httpResponses.values()) {
+ JsonNode node = JsonUtils.stringToJsonNode(body);
+ // legacy returns one JSON per segment; new returns one JSON with many
fields
+ if (perSegmentJson) {
+ String segmentName = node.get("segmentName").asText();
+ aggregatedNode.set(segmentName, node);
+ } else {
+ node.fields().forEachRemaining(entry ->
aggregatedNode.set(entry.getKey(), entry.getValue()));
}
}
+ return aggregatedNode;
+ }
- List<String> segmentsMetadata =
-
serverSegmentMetadataReader.getSegmentMetadataFromServer(tableNameWithType,
serverToSegmentsMap, endpoints,
- columns, timeoutMs);
- Map<String, JsonNode> response = new HashMap<>();
- for (String segmentMetadata : segmentsMetadata) {
- JsonNode responseJson = JsonUtils.stringToJsonNode(segmentMetadata);
- response.put(responseJson.get("segmentName").asText(), responseJson);
+ private List<String> buildTableLevelUrls(Map<String, List<String>>
serverToSegs, BiMap<String, String> endpoints,
+ String tableNameWithType, List<String> columns, List<String>
segmentsFilter, ServerSegmentMetadataReader reader) {
+ List<String> urls = new ArrayList<>(serverToSegs.size());
+ for (String server : serverToSegs.keySet()) {
+ urls.add(reader.generateTableMetadataServerURL(
+ tableNameWithType, columns, segmentsFilter, endpoints.get(server)));
}
- return JsonUtils.objectToJsonNode(response);
+ return urls;
}
- /**
- * This method retrieves the full segment metadata for a given table.
- * Currently supports only OFFLINE tables.
- * @return a map of segmentName to its metadata
- */
- public JsonNode getSegmentsMetadata(String tableNameWithType, List<String>
columns, int timeoutMs)
+ private List<String> buildSegmentLevelUrls(Map<String, List<String>>
serverToSegs, BiMap<String, String> endpoints,
+ String tableNameWithType, List<String> columns, List<String>
segmentsFilter, ServerSegmentMetadataReader reader) {
+ List<String> urls = new ArrayList<>();
+ for (Map.Entry<String, List<String>> e : serverToSegs.entrySet()) {
+ for (String segment : e.getValue()) {
+ if (segmentsFilter == null || segmentsFilter.isEmpty()
+ || segmentsFilter.contains(segment)) {
+ urls.add(reader.generateSegmentMetadataServerURL(
+ tableNameWithType, segment, columns, endpoints.get(e.getKey())));
+ }
+ }
+ }
+ return urls;
+ }
+
+ private JsonNode getSegmentsMetadataInternal(String tableNameWithType,
@Nullable List<String> columns,
+ @Nullable List<String> segments, int timeoutMs)
throws InvalidConfigException, IOException {
- return getSegmentsMetadataInternal(tableNameWithType, columns, null,
timeoutMs);
+ Map<String, List<String>> serverToSegs =
+ _pinotHelixResourceManager.getServerToSegmentsMap(tableNameWithType);
+ BiMap<String, String> endpoints =
+
_pinotHelixResourceManager.getDataInstanceAdminEndpoints(serverToSegs.keySet());
+ ServerSegmentMetadataReader reader =
+ new ServerSegmentMetadataReader(_executor, _connectionManager);
+
+ // try table level endpoint first
+ try {
+ List<String> tableUrls = buildTableLevelUrls(serverToSegs, endpoints,
+ tableNameWithType, columns, segments, reader);
+ return fetchAndAggregateMetadata(tableUrls, endpoints,
/*perSegmentJson=*/false,
+ tableNameWithType, timeoutMs);
+ } catch (RuntimeException e) {
+ log.warn("Failed to fetch table metadata for table {} using new server
endpoint, falling back to legacy "
+ + "per-segment endpoint", tableNameWithType, e);
+ }
+
+ // legacy per segment endpoint
+ List<String> segmentUrls = buildSegmentLevelUrls(serverToSegs, endpoints,
+ tableNameWithType, columns, segments, reader);
+ return fetchAndAggregateMetadata(segmentUrls, endpoints.inverse(),
/*perSegmentJson=*/true,
+ tableNameWithType, timeoutMs);
}
/**
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
index 1c3e3e68bc6..300f9e4ae88 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/BaseClusterIntegrationTest.java
@@ -873,6 +873,14 @@ public abstract class BaseClusterIntegrationTest extends
ClusterTest {
.get("columnIndexSizeMap").get(column);
}
+ /**
+ * Get all segment names for a given tableName and tableType.
+ */
+ protected List<String> getSegmentNames(String tableName, @Nullable String
tableType)
+ throws Exception {
+ return getControllerRequestClient().listSegments(tableName, tableType,
true);
+ }
+
protected List<ValidDocIdsMetadataInfo> getValidDocIdsMetadata(String
tableNameWithType,
ValidDocIdsType validDocIdsType)
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
index d60ce7b4282..57a7ad236d9 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HybridClusterIntegrationTest.java
@@ -19,7 +19,11 @@
package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
+import java.net.URLEncoder;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
@@ -108,16 +112,68 @@ public class HybridClusterIntegrationTest extends
BaseHybridClusterIntegrationTe
@Test
public void testSegmentMetadataApi()
throws Exception {
- String jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName()));
- JsonNode tableSegmentsMetadata = JsonUtils.stringToJsonNode(jsonOutputStr);
- Assert.assertEquals(tableSegmentsMetadata.size(), 8);
-
- JsonNode segmentMetadataFromAllEndpoint =
tableSegmentsMetadata.elements().next();
- String segmentName =
segmentMetadataFromAllEndpoint.get("segmentName").asText();
- jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(),
segmentName));
- JsonNode segmentMetadataFromDirectEndpoint =
JsonUtils.stringToJsonNode(jsonOutputStr);
- Assert.assertEquals(segmentMetadataFromAllEndpoint.get("totalDocs"),
- segmentMetadataFromDirectEndpoint.get("segment.total.docs"));
+ {
+ String jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName()));
+ JsonNode tableSegmentsMetadata =
JsonUtils.stringToJsonNode(jsonOutputStr);
+ Assert.assertEquals(tableSegmentsMetadata.size(), 8);
+
+ JsonNode segmentMetadataFromAllEndpoint =
tableSegmentsMetadata.elements().next();
+ String segmentName =
segmentMetadataFromAllEndpoint.get("segmentName").asText();
+ jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(),
segmentName));
+ JsonNode segmentMetadataFromDirectEndpoint =
JsonUtils.stringToJsonNode(jsonOutputStr);
+ Assert.assertEquals(segmentMetadataFromAllEndpoint.get("totalDocs"),
+ segmentMetadataFromDirectEndpoint.get("segment.total.docs"));
+ }
+ // get list of segment names to pass in query params for following tests
+ List<String> segmentNames = getSegmentNames(getTableName(),
TableType.OFFLINE.toString());
+ List<String> segments = new ArrayList<>();
+ for (String segment : segmentNames) {
+ String encodedSegmentName = URLEncoder.encode(segment,
StandardCharsets.UTF_8.toString());
+ segments.add(encodedSegmentName);
+ }
+ // with null column params
+ {
+ String jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
+ null, segments));
+ JsonNode tableSegmentsMetadata =
JsonUtils.stringToJsonNode(jsonOutputStr);
+ Assert.assertEquals(tableSegmentsMetadata.size(), segments.size());
+ JsonNode segmentMetadataFromAllEndpoint =
tableSegmentsMetadata.elements().next();
+ String segmentName =
segmentMetadataFromAllEndpoint.get("segmentName").asText();
+ jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(),
segmentName));
+ JsonNode segmentMetadataFromDirectEndpoint =
JsonUtils.stringToJsonNode(jsonOutputStr);
+ Assert.assertEquals(segmentMetadataFromAllEndpoint.get("totalDocs"),
+ segmentMetadataFromDirectEndpoint.get("segment.total.docs"));
+
Assert.assertEquals(tableSegmentsMetadata.get(segmentNames.get(0)).get("columns").size(),
0);
+ }
+ // with * column param
+ {
+ String jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
+ List.of("*"), segments));
+ JsonNode tableSegmentsMetadata =
JsonUtils.stringToJsonNode(jsonOutputStr);
+ Assert.assertEquals(tableSegmentsMetadata.size(), segments.size());
+ JsonNode segmentMetadataFromAllEndpoint =
tableSegmentsMetadata.elements().next();
+ String segmentName =
segmentMetadataFromAllEndpoint.get("segmentName").asText();
+ jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(),
segmentName));
+ JsonNode segmentMetadataFromDirectEndpoint =
JsonUtils.stringToJsonNode(jsonOutputStr);
+ Assert.assertEquals(segmentMetadataFromAllEndpoint.get("totalDocs"),
+ segmentMetadataFromDirectEndpoint.get("segment.total.docs"));
+
Assert.assertEquals(tableSegmentsMetadata.get(segmentNames.get(0)).get("columns").size(),
79);
+ }
+ // with specified column params
+ {
+ List<String> columns = List.of("Carrier", "FlightNum", "TailNum");
+ String jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentsMetadataFromServer(getTableName(),
+ columns, segments));
+ JsonNode tableSegmentsMetadata =
JsonUtils.stringToJsonNode(jsonOutputStr);
+ Assert.assertEquals(tableSegmentsMetadata.size(), segments.size());
+ JsonNode segmentMetadataFromAllEndpoint =
tableSegmentsMetadata.elements().next();
+ String segmentName =
segmentMetadataFromAllEndpoint.get("segmentName").asText();
+ jsonOutputStr =
sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(),
segmentName));
+ JsonNode segmentMetadataFromDirectEndpoint =
JsonUtils.stringToJsonNode(jsonOutputStr);
+ Assert.assertEquals(segmentMetadataFromAllEndpoint.get("totalDocs"),
+ segmentMetadataFromDirectEndpoint.get("segment.total.docs"));
+
Assert.assertEquals(tableSegmentsMetadata.get(segmentNames.get(0)).get("columns").size(),
columns.size());
+ }
}
@Test
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
index f3e3f4405ba..0b4e3e97793 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/api/resources/TablesResource.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.server.api.resources;
+import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Preconditions;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiKeyAuthDefinition;
@@ -60,6 +61,7 @@ import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response;
import javax.ws.rs.core.StreamingOutput;
+import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.model.IdealState;
@@ -410,6 +412,64 @@ public class TablesResource {
}
}
+ @GET
+ @Encoded
+ @Path("/tables/{tableName}/segments/metadata")
+ @Produces(MediaType.APPLICATION_JSON)
+ @ApiOperation(value = "Provide segments metadata", notes = "Provide segments
metadata for the segments on server")
+ @ApiResponses(value = {
+ @ApiResponse(code = 200, message = "Success"),
+ @ApiResponse(code = 500, message = "Internal server error", response =
ErrorInfo.class),
+ @ApiResponse(code = 404, message = "Table or segment not found",
response = ErrorInfo.class)
+ })
+ public String getSegmentsMetadata(
+ @ApiParam(value = "Table name including type", required = true, example
= "myTable_OFFLINE")
+ @PathParam("tableName") String tableName,
+ @Nullable @ApiParam(value = "Segments name", allowMultiple = true)
@QueryParam("segments") List<String> segments,
+ @Nullable @ApiParam(value = "Column name", allowMultiple = true)
@QueryParam("columns") List<String> columns,
+ @Context HttpHeaders headers) {
+ tableName = DatabaseUtils.translateTableName(tableName, headers);
+ TableDataManager tableDataManager =
ServerResourceUtils.checkGetTableDataManager(_serverInstance, tableName);
+ // decode columns and segments
+ List<String> decodedSegments = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(segments)) {
+ for (String segment : segments) {
+ decodedSegments.add(URIUtils.decode(segment));
+ }
+ }
+ List<SegmentDataManager> segmentDataManagers;
+ if (!decodedSegments.isEmpty()) {
+ segmentDataManagers = tableDataManager.acquireSegments(decodedSegments,
new ArrayList<>());
+ } else {
+ segmentDataManagers = tableDataManager.acquireAllSegments();
+ }
+ List<String> decodedColumns = new ArrayList<>();
+ if (CollectionUtils.isNotEmpty(columns)) {
+ for (String column: columns) {
+ decodedColumns.add(URIUtils.decode(column));
+ }
+ }
+ // get metadata for every segment in the list
+ Map<String, JsonNode> response = new HashMap<>();
+ try {
+ for (SegmentDataManager segmentDataManager: segmentDataManagers) {
+ String segmentName = segmentDataManager.getSegmentName();
+ String segmentMetadata =
SegmentMetadataFetcher.getSegmentMetadata(segmentDataManager, decodedColumns);
+ JsonNode segmentMetadataJson =
JsonUtils.stringToJsonNode(segmentMetadata);
+ response.put(segmentName, segmentMetadataJson);
+ }
+ } catch (Exception e) {
+ LOGGER.error("Failed to convert table {} segments to json", tableName);
+ throw new WebApplicationException("Failed to convert segment metadata to
json",
+ Response.Status.INTERNAL_SERVER_ERROR);
+ } finally {
+ for (SegmentDataManager segmentDataManager : segmentDataManagers) {
+ tableDataManager.releaseSegment(segmentDataManager);
+ }
+ }
+ return ResourceUtils.convertToJsonString(response);
+ }
+
@GET
@Path("/tables/{tableName}/segments/crc")
@Produces(MediaType.APPLICATION_JSON)
diff --git
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
index 42c78660142..eb3fa5298e8 100644
---
a/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
+++
b/pinot-server/src/test/java/org/apache/pinot/server/api/TablesResourceTest.java
@@ -241,6 +241,50 @@ public class TablesResourceTest extends BaseResourceTest {
Assert.assertEquals(response.getStatus(),
Response.Status.NOT_FOUND.getStatusCode());
}
+ @Test
+ public void testSegmentsMetadata()
+ throws Exception {
+ IndexSegment defaultSegment = _realtimeIndexSegments.get(0);
+ String segmentMetadataPath = "/tables/" +
TableNameBuilder.REALTIME.tableNameWithType(TABLE_NAME)
+ + "/segments/metadata";
+ String segmentName = defaultSegment.getSegmentName();
+
+ JsonNode jsonResponse =
JsonUtils.stringToJsonNode((_webTarget.path(segmentMetadataPath)
+ .queryParam("segmentsToInclude",
segmentName)).request().get(String.class));
+ JsonNode jsonNode = jsonResponse.get(segmentName);
+ SegmentMetadata segmentMetadata = defaultSegment.getSegmentMetadata();
+ Assert.assertEquals(jsonNode.get("segmentName").asText(),
segmentMetadata.getName());
+ Assert.assertEquals(jsonNode.get("crc").asText(),
segmentMetadata.getCrc());
+ Assert.assertEquals(jsonNode.get("creationTimeMillis").asLong(),
segmentMetadata.getIndexCreationTime());
+ Assert.assertTrue(jsonNode.has("startTimeReadable"));
+ Assert.assertTrue(jsonNode.has("endTimeReadable"));
+ Assert.assertTrue(jsonNode.has("creationTimeReadable"));
+ Assert.assertEquals(jsonNode.get("columns").size(), 0);
+ Assert.assertEquals(jsonNode.get("indexes").size(), 0);
+
+ jsonResponse = JsonUtils.stringToJsonNode(
+ _webTarget.path(segmentMetadataPath).queryParam("columns",
"column1").queryParam("columns", "column2")
+ .queryParam("segmentsToInclude",
segmentName).request().get(String.class));
+ jsonNode = jsonResponse.get(segmentName);
+ Assert.assertEquals(jsonNode.get("columns").size(), 2);
+ Assert.assertEquals(jsonNode.get("indexes").size(), 2);
+ Assert.assertNotNull(jsonNode.get("columns").get(0).get("indexSizeMap"));
+ Assert.assertNotNull(jsonNode.get("columns").get(1).get("indexSizeMap"));
+
Assert.assertEquals(jsonNode.get("indexes").get("column1").get("h3-index").asText(),
"NO");
+
Assert.assertEquals(jsonNode.get("indexes").get("column1").get("fst-index").asText(),
"NO");
+
Assert.assertEquals(jsonNode.get("indexes").get("column1").get("text-index").asText(),
"NO");
+
Assert.assertEquals(jsonNode.get("indexes").get("column2").get("h3-index").asText(),
"NO");
+
Assert.assertEquals(jsonNode.get("indexes").get("column2").get("fst-index").asText(),
"NO");
+
Assert.assertEquals(jsonNode.get("indexes").get("column2").get("text-index").asText(),
"NO");
+
+ jsonResponse =
JsonUtils.stringToJsonNode((_webTarget.path(segmentMetadataPath)
+ .queryParam("columns", "*").queryParam("segmentsToInclude",
segmentName).request().get(String.class)));
+ int physicalColumnCount = defaultSegment.getPhysicalColumnNames().size();
+ jsonNode = jsonResponse.get(segmentName);
+ Assert.assertEquals(jsonNode.get("columns").size(), physicalColumnCount);
+ Assert.assertEquals(jsonNode.get("indexes").size(), physicalColumnCount);
+ }
+
@Test
public void testSegmentCrcMetadata()
throws Exception {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
index d6efa38cb82..1413a367954 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/builder/ControllerRequestURLBuilder.java
@@ -20,6 +20,7 @@ package org.apache.pinot.spi.utils.builder;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@@ -361,6 +362,26 @@ public class ControllerRequestURLBuilder {
}
}
+ private String constructQueryParametersString(Map<String, List<String>>
queryParams) {
+ if (queryParams.isEmpty()) {
+ return "";
+ }
+ StringBuilder query = new StringBuilder("?");
+ boolean firstParam = true;
+ for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
+ String key = entry.getKey();
+ for (String value : entry.getValue()) {
+ if (!firstParam) {
+ query.append("&");
+ }
+ query.append(key).append("=").append(value);
+ firstParam = false;
+ }
+ }
+ return query.toString();
+ }
+
+
public String forSchemaValidate() {
return StringUtil.join("/", _baseUrl, "schemas", "validate");
}
@@ -445,7 +466,21 @@ public class ControllerRequestURLBuilder {
}
public String forSegmentsMetadataFromServer(String tableName, @Nullable
List<String> columns) {
- return StringUtil.join("/", _baseUrl, "segments", tableName, "metadata") +
constructColumnsParameter(columns);
+ return forSegmentsMetadataFromServer(tableName, columns, null);
+ }
+
+ public String forSegmentsMetadataFromServer(String tableName, @Nullable
List<String> columns,
+ @Nullable List<String> segments) {
+ String basePath = StringUtil.join("/", _baseUrl, "segments", tableName,
"metadata");
+ Map<String, List<String>> queryParams = new LinkedHashMap<>();
+ if (!CollectionUtils.isEmpty(columns)) {
+ queryParams.put("columns", columns);
+ }
+ if (!CollectionUtils.isEmpty(segments)) {
+ queryParams.put("segments", segments);
+ }
+ String queryString = constructQueryParametersString(queryParams);
+ return basePath + queryString;
}
public String forSegmentMetadata(String tableName, String segmentName) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]