This is an automated email from the ASF dual-hosted git repository.
ankitsultana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 078c47a63f [timeseries] Adding requester level access control check
for timeseries queries (#16066)
078c47a63f is described below
commit 078c47a63f62ff7a39248860a9cd803a9fdd503f
Author: Shaurya Chaturvedi <[email protected]>
AuthorDate: Fri Jun 13 13:04:29 2025 -0700
[timeseries] Adding requester level access control check for timeseries
queries (#16066)
Co-authored-by: shauryachats <[email protected]>
---
.../broker/api/resources/PinotClientRequest.java | 8 +-
.../requesthandler/BrokerRequestHandler.java | 2 +-
.../BrokerRequestHandlerDelegate.java | 5 +-
.../requesthandler/TimeSeriesRequestHandler.java | 24 ++-
.../pinot/integration/tests/ClusterTest.java | 4 +-
.../tests/TimeSeriesAuthIntegrationTest.java | 177 +++++++++++++++++++++
.../tests/TimeSeriesIntegrationTest.java | 8 +-
7 files changed, 218 insertions(+), 10 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
index 0428d107cc..ad28f32733 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/api/resources/PinotClientRequest.java
@@ -78,6 +78,7 @@ import
org.apache.pinot.core.query.executor.sql.SqlQueryExecutor;
import org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.query.request.context.utils.QueryContextUtils;
+import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.query.QueryThreadContext;
@@ -292,7 +293,8 @@ public class PinotClientRequest {
try {
try (RequestScope requestContext =
Tracing.getTracer().createRequestScope()) {
String queryString = requestCtx.getQueryString();
- PinotBrokerTimeSeriesResponse response =
executeTimeSeriesQuery(language, queryString, requestContext);
+ PinotBrokerTimeSeriesResponse response =
executeTimeSeriesQuery(language, queryString, requestContext,
+ makeHttpIdentity(requestCtx));
if (response.getErrorType() != null &&
!response.getErrorType().isEmpty()) {
asyncResponse.resume(Response.serverError().entity(response).build());
return;
@@ -536,8 +538,8 @@ public class PinotClientRequest {
}
private PinotBrokerTimeSeriesResponse executeTimeSeriesQuery(String
language, String queryString,
- RequestContext requestContext) {
- return _requestHandler.handleTimeSeriesRequest(language, queryString,
requestContext);
+ RequestContext requestContext, RequesterIdentity requesterIdentity) {
+ return _requestHandler.handleTimeSeriesRequest(language, queryString,
requestContext, requesterIdentity);
}
public static HttpRequesterIdentity
makeHttpIdentity(org.glassfish.grizzly.http.server.Request context) {
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
index 9dfb43ace9..d4d143747a 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandler.java
@@ -65,7 +65,7 @@ public interface BrokerRequestHandler {
* Run a query and use the time-series engine.
*/
default PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
- RequestContext requestContext) {
+ RequestContext requestContext, @Nullable RequesterIdentity
requesterIdentity) {
throw new UnsupportedOperationException("Handler does not support Time
Series requests");
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
index c2173b3728..4a7929966e 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BrokerRequestHandlerDelegate.java
@@ -125,9 +125,10 @@ public class BrokerRequestHandlerDelegate implements
BrokerRequestHandler {
@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
- RequestContext requestContext) {
+ RequestContext requestContext, RequesterIdentity requesterIdentity) {
if (_timeSeriesRequestHandler != null) {
- return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang,
rawQueryParamString, requestContext);
+ return _timeSeriesRequestHandler.handleTimeSeriesRequest(lang,
rawQueryParamString, requestContext,
+ requesterIdentity);
}
return new PinotBrokerTimeSeriesResponse("error", null, "error", "Time
series query engine not enabled.");
}
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
index 137700b374..7a6ff11147 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/TimeSeriesRequestHandler.java
@@ -29,7 +29,9 @@ import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
+import javax.ws.rs.WebApplicationException;
import javax.ws.rs.core.HttpHeaders;
+import javax.ws.rs.core.Response;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
import org.apache.http.NameValuePair;
@@ -45,6 +47,7 @@ import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.PinotBrokerTimeSeriesResponse;
import org.apache.pinot.common.utils.HumanReadableDuration;
import org.apache.pinot.query.service.dispatch.QueryDispatcher;
+import org.apache.pinot.spi.auth.AuthorizationResult;
import org.apache.pinot.spi.auth.broker.RequesterIdentity;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.trace.RequestContext;
@@ -101,7 +104,7 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
@Override
public PinotBrokerTimeSeriesResponse handleTimeSeriesRequest(String lang,
String rawQueryParamString,
- RequestContext requestContext) {
+ RequestContext requestContext, RequesterIdentity requesterIdentity) {
PinotBrokerTimeSeriesResponse timeSeriesResponse = null;
long queryStartTime = System.currentTimeMillis();
try {
@@ -109,6 +112,7 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
requestContext.setBrokerId(_brokerId);
requestContext.setRequestId(_requestIdGenerator.get());
RangeTimeSeriesRequest timeSeriesRequest = null;
+ firstStageAccessControlCheck(requesterIdentity);
try {
timeSeriesRequest = buildRangeTimeSeriesRequest(lang,
rawQueryParamString);
} catch (URISyntaxException e) {
@@ -219,4 +223,22 @@ public class TimeSeriesRequestHandler extends
BaseBrokerRequestHandler {
}
return HumanReadableDuration.from(step).getSeconds();
}
+
+ /**
+ * First-stage access control check for the request.
+ * This method checks if the requester has access to the broker to prevent
unauthenticated requests from
+ * using up resources.
+ * Secondary table-level access control checks will be performed later.
+ *
+ * @param requesterIdentity The identity of the requester.
+ */
+ private void firstStageAccessControlCheck(RequesterIdentity
requesterIdentity) {
+ AccessControl accessControl = _accessControlFactory.create();
+ AuthorizationResult authorizationResult =
accessControl.authorize(requesterIdentity);
+ if (!authorizationResult.hasAccess()) {
+
_brokerMetrics.addMeteredGlobalValue(BrokerMeter.REQUEST_DROPPED_DUE_TO_ACCESS_ERROR,
1);
+ throw new WebApplicationException("Permission denied. " +
authorizationResult.getFailureMessage(),
+ Response.Status.FORBIDDEN);
+ }
+ }
}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index e1fc48bdb0..01442e4048 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -555,12 +555,12 @@ public abstract class ClusterTest extends ControllerTest {
* Queries the broker's timeseries query endpoint
(/timeseries/api/v1/query_range).
* This is used for testing timeseries queries.
*/
- public JsonNode getTimeseriesQuery(String query, long startTime, long
endTime) {
+ public JsonNode getTimeseriesQuery(String query, long startTime, long
endTime, Map<String, String> headers) {
try {
Map<String, String> queryParams = Map.of("language", "m3ql", "query",
query, "start",
String.valueOf(startTime), "end", String.valueOf(endTime));
String url =
buildQueryUrl(getTimeSeriesQueryApiUrl(getBrokerBaseApiUrl()), queryParams);
- JsonNode responseJsonNode =
JsonUtils.stringToJsonNode(sendGetRequest(url, Map.of()));
+ JsonNode responseJsonNode =
JsonUtils.stringToJsonNode(sendGetRequest(url, headers));
return sanitizeResponse(responseJsonNode);
} catch (Exception e) {
throw new RuntimeException("Failed to get timeseries query: " + query,
e);
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java
new file mode 100644
index 0000000000..9c4e885f37
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesAuthIntegrationTest.java
@@ -0,0 +1,177 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import org.apache.hc.core5.http.Header;
+import org.apache.hc.core5.http.NameValuePair;
+import org.apache.hc.core5.http.message.BasicHeader;
+import org.apache.hc.core5.http.message.BasicNameValuePair;
+import org.apache.http.HttpStatus;
+import org.apache.pinot.client.Connection;
+import org.apache.pinot.client.ConnectionFactory;
+import org.apache.pinot.client.JsonAsyncHttpPinotClientTransportFactory;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.utils.FileUploadDownloadClient;
+import org.apache.pinot.common.utils.URIUtils;
+import org.apache.pinot.common.utils.http.HttpClient;
+import org.apache.pinot.controller.helix.ControllerRequestClient;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static
org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_HEADER;
+import static org.apache.pinot.integration.tests.BasicAuthTestUtils.AUTH_TOKEN;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+
+
+public class TimeSeriesAuthIntegrationTest extends TimeSeriesIntegrationTest {
+ protected static final Logger LOGGER =
LoggerFactory.getLogger(TimeSeriesAuthIntegrationTest.class);
+
+ @Override
+ protected Map<String, String> getHeaders() {
+ return BasicAuthTestUtils.AUTH_HEADER;
+ }
+
+ @Override
+ public ControllerRequestClient getControllerRequestClient() {
+ if (_controllerRequestClient == null) {
+ _controllerRequestClient =
+ new ControllerRequestClient(_controllerRequestURLBuilder,
getHttpClient(), AUTH_HEADER);
+ }
+ return _controllerRequestClient;
+ }
+
+ @Override
+ protected Connection getPinotConnection() {
+ if (_pinotConnection == null) {
+ JsonAsyncHttpPinotClientTransportFactory factory = new
JsonAsyncHttpPinotClientTransportFactory();
+ factory.setHeaders(AUTH_HEADER);
+
+ _pinotConnection =
+ ConnectionFactory.fromZookeeper(getZkUrl() + "/" +
getHelixClusterName(), factory.buildTransport());
+ }
+ return _pinotConnection;
+ }
+
+ @Override
+ protected void overrideControllerConf(Map<String, Object> properties) {
+ BasicAuthTestUtils.addControllerConfiguration(properties);
+ }
+
+ @Override
+ protected void overrideBrokerConf(PinotConfiguration brokerConf) {
+ super.overrideBrokerConf(brokerConf);
+ BasicAuthTestUtils.addBrokerConfiguration(brokerConf);
+ }
+
+ @Override
+ protected void overrideServerConf(PinotConfiguration serverConf) {
+ super.overrideServerConf(serverConf);
+ BasicAuthTestUtils.addServerConfiguration(serverConf);
+ }
+
+ // TODO: Enhance base method to support custom headers.
+ @Override
+ protected void uploadSegments(String tableName, TableType tableType,
List<File> tarDirs)
+ throws Exception {
+ List<File> segmentTarFiles = new ArrayList<>();
+ for (File tarDir : tarDirs) {
+ File[] tarFiles = tarDir.listFiles();
+ assertNotNull(tarFiles);
+ Collections.addAll(segmentTarFiles, tarFiles);
+ }
+ int numSegments = segmentTarFiles.size();
+ assertTrue(numSegments > 0);
+
+ URI uploadSegmentHttpURI =
URI.create(getControllerRequestURLBuilder().forSegmentUpload());
+ NameValuePair
+ tableNameValuePair = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
tableName);
+ NameValuePair tableTypeValuePair = new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
+ tableType.name());
+ List<NameValuePair> parameters = Arrays.asList(tableNameValuePair,
tableTypeValuePair);
+ List<Header> headers = List.of(new BasicHeader("Authorization",
AUTH_TOKEN));
+
+ try (FileUploadDownloadClient fileUploadDownloadClient = new
FileUploadDownloadClient()) {
+ if (numSegments == 1) {
+ File segmentTarFile = segmentTarFiles.get(0);
+ if (System.currentTimeMillis() % 2 == 0) {
+ assertEquals(
+ fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(), segmentTarFile,
+ headers, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode(), HttpStatus.SC_OK);
+ } else {
+ assertEquals(
+ uploadSegmentWithOnlyMetadata(tableName, tableType,
uploadSegmentHttpURI, fileUploadDownloadClient,
+ segmentTarFile), HttpStatus.SC_OK);
+ }
+ } else {
+ // Upload all segments in parallel
+ ExecutorService executorService =
Executors.newFixedThreadPool(numSegments);
+ List<Future<Integer>> futures = new ArrayList<>(numSegments);
+ for (File segmentTarFile : segmentTarFiles) {
+ futures.add(executorService.submit(() -> {
+ if (System.currentTimeMillis() % 2 == 0) {
+ return
fileUploadDownloadClient.uploadSegment(uploadSegmentHttpURI,
segmentTarFile.getName(),
+ segmentTarFile, headers, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+ } else {
+ return uploadSegmentWithOnlyMetadata(tableName, tableType,
uploadSegmentHttpURI, fileUploadDownloadClient,
+ segmentTarFile);
+ }
+ }));
+ }
+ executorService.shutdown();
+ for (Future<Integer> future : futures) {
+ assertEquals((int) future.get(), HttpStatus.SC_OK);
+ }
+ }
+ }
+ }
+
+ private int uploadSegmentWithOnlyMetadata(String tableName, TableType
tableType, URI uploadSegmentHttpURI,
+ FileUploadDownloadClient fileUploadDownloadClient, File segmentTarFile)
+ throws IOException, HttpErrorStatusException {
+ List<Header> headers = List.of(new
BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI,
+ String.format("file://%s/%s",
segmentTarFile.getParentFile().getAbsolutePath(),
+ URIUtils.encode(segmentTarFile.getName()))),
+ new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE,
+ FileUploadDownloadClient.FileUploadType.METADATA.toString()),
+ new BasicHeader("Authorization", AUTH_TOKEN));
+ // Add table name and table type as request parameters
+ NameValuePair tableNameValuePair =
+ new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME,
tableName);
+ NameValuePair tableTypeValuePair =
+ new
BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE,
tableType.name());
+ List<NameValuePair> parameters = Arrays.asList(tableNameValuePair,
tableTypeValuePair);
+ return
fileUploadDownloadClient.uploadSegmentMetadata(uploadSegmentHttpURI,
segmentTarFile.getName(),
+ segmentTarFile, headers, parameters,
HttpClient.DEFAULT_SOCKET_TIMEOUT_MS).getStatusCode();
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
index 58ac3fcc0f..98de16b5dc 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TimeSeriesIntegrationTest.java
@@ -21,6 +21,8 @@ package org.apache.pinot.integration.tests;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import java.io.File;
+import java.util.Collections;
+import java.util.Map;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumWriter;
@@ -164,8 +166,12 @@ public class TimeSeriesIntegrationTest extends
BaseClusterIntegrationTest {
);
}
+ protected Map<String, String> getHeaders() {
+ return Collections.emptyMap();
+ }
+
private void runGroupedTimeSeriesQuery(String query, int expectedGroups,
TimeSeriesValidator validator) {
- JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC,
QUERY_END_TIME_SEC);
+ JsonNode result = getTimeseriesQuery(query, QUERY_START_TIME_SEC,
QUERY_END_TIME_SEC, getHeaders());
System.out.println(result);
assertEquals(result.get("status").asText(), "success");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]