This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 285a18b8442 broker/server: add ignoreMissingSegments query option with
broker config; skip SERVER_SEGMENT_MISSING errors when enabled; add SSE/MSE
wiring and unit tests (#16556)
285a18b8442 is described below
commit 285a18b84427e7e8e7e60ea8970e9fc2d5d325da
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Aug 17 23:57:54 2025 -0700
broker/server: add ignoreMissingSegments query option with broker config;
skip SERVER_SEGMENT_MISSING errors when enabled; add SSE/MSE wiring and unit
tests (#16556)
---
.../BaseSingleStageBrokerRequestHandler.java | 11 +++++
.../MultiStageBrokerRequestHandler.java | 40 +++++++++-------
.../common/utils/config/QueryOptionsUtils.java | 4 ++
.../common/utils/config/QueryOptionsUtilsTest.java | 13 ++++++
.../query/executor/ServerQueryExecutorV1Impl.java | 2 +-
.../core/query/reduce/BrokerReduceService.java | 8 ++++
.../core/query/reduce/StreamingReduceService.java | 8 ++++
.../executor/QueryExecutorExceptionsTest.java | 25 ++++++++++
.../core/query/reduce/BrokerReduceServiceTest.java | 44 ++++++++++++++++++
.../query/reduce/StreamingReduceServiceTest.java | 54 ++++++++++++++++++++++
.../apache/pinot/spi/utils/CommonConstants.java | 10 ++++
11 files changed, 201 insertions(+), 18 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index c829ae8427b..199fbdf9d76 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -745,6 +745,8 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
serverBrokerRequest.getPinotQuery().getQueryOptions()
.put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
}
+ // Optionally set ignoreMissingSegments based on broker config
+ setIgnoreMissingSegmentsIfConfigured(queryOptions);
}
}
if (realtimeBrokerRequest != null) {
@@ -758,6 +760,8 @@ public abstract class BaseSingleStageBrokerRequestHandler
extends BaseBrokerRequ
serverBrokerRequest.getPinotQuery().getQueryOptions()
.put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
}
+ // Optionally set ignoreMissingSegments based on broker config
+ setIgnoreMissingSegmentsIfConfigured(queryOptions);
}
}
@@ -2082,6 +2086,13 @@ public abstract class
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
return _brokerId + "_" + requestId;
}
+ private void setIgnoreMissingSegmentsIfConfigured(Map<String, String>
queryOptions) {
+ if
(_config.getProperty(CommonConstants.Broker.CONFIG_OF_IGNORE_MISSING_SEGMENTS,
+ CommonConstants.Broker.DEFAULT_IGNORE_MISSING_SEGMENTS)) {
+ queryOptions.putIfAbsent(QueryOptionKey.IGNORE_MISSING_SEGMENTS, "true");
+ }
+ }
+
/**
* Helper class to pass the per server statistics.
*/
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index f841e46aafc..a366fd07144 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -120,7 +120,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
/// </AppenderRef>
/// </Logger>
/// </Loggers>
- /// ```
+ ///```
private static final Marker MSE_STATS_MARKER =
MarkerFactory.getMarker("MSE_STATS_MARKER");
private static final int NUM_UNAVAILABLE_SEGMENTS_TO_LOG = 10;
@@ -166,9 +166,9 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
CommonConstants.MultiStageQueryRunner.DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN);
_queryThrottler = queryThrottler;
_queryCompileExecutor = QueryThreadContext.contextAwareExecutorService(
- Executors.newFixedThreadPool(
- Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
- new NamedThreadFactory("multi-stage-query-compile-executor")));
+ Executors.newFixedThreadPool(
+ Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
+ new NamedThreadFactory("multi-stage-query-compile-executor")));
}
@Override
@@ -442,7 +442,6 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
return passiveTimeoutMsFromQueryOption != null ?
passiveTimeoutMsFromQueryOption : _extraPassiveTimeoutMs;
}
-
/**
* Explains the query and returns the broker response.
*
@@ -463,7 +462,6 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
QueryEnvironment.QueryPlannerResult queryPlanResult = callAsync(requestId,
query.getTextQuery(),
() -> query.explain(requestId, fragmentToPlanNode), timer);
String plan = queryPlanResult.getExplainPlan();
- Set<String> tableNames = queryPlanResult.getTableNames();
Map<String, String> extraFields = queryPlanResult.getExtraFields();
return constructMultistageExplainPlan(query.getTextQuery(), plan,
extraFields);
}
@@ -477,6 +475,12 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
+ // Optionally set ignoreMissingSegments query option based on broker
config if not already set.
+ if
(_config.getProperty(CommonConstants.Broker.CONFIG_OF_IGNORE_MISSING_SEGMENTS,
+ CommonConstants.Broker.DEFAULT_IGNORE_MISSING_SEGMENTS)) {
+
query.getOptions().putIfAbsent(CommonConstants.Broker.Request.QueryOptionKey.IGNORE_MISSING_SEGMENTS,
"true");
+ }
+
Set<QueryServerInstance> servers = new HashSet<>();
for (DispatchablePlanFragment planFragment :
dispatchableSubPlan.getQueryStageMap().values()) {
servers.addAll(planFragment.getServerInstances());
@@ -537,7 +541,7 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
QueryDispatcher.QueryResult queryResults;
try {
queryResults = _queryDispatcher.submitAndReduce(requestContext,
dispatchableSubPlan, timer.getRemainingTimeMs(),
- query.getOptions());
+ query.getOptions());
} catch (QueryException e) {
throw e;
} catch (Throwable t) {
@@ -581,17 +585,19 @@ public class MultiStageBrokerRequestHandler extends
BaseBrokerRequestHandler {
brokerResponse.setNumServersQueried(servers.size() - 1);
brokerResponse.setNumServersResponded(servers.size() - 1);
- // Attach unavailable segments
+ // Attach unavailable segments (unless configured to ignore missing
segments)
int numUnavailableSegments = 0;
- for (Map.Entry<String, Set<String>> entry :
dispatchableSubPlan.getTableToUnavailableSegmentsMap().entrySet()) {
- String tableName = entry.getKey();
- Set<String> unavailableSegments = entry.getValue();
- int unavailableSegmentsInSubPlan = unavailableSegments.size();
- numUnavailableSegments += unavailableSegmentsInSubPlan;
- QueryProcessingException errMsg = new
QueryProcessingException(QueryErrorCode.SERVER_SEGMENT_MISSING,
- "Found " + unavailableSegmentsInSubPlan + " unavailable segments
for table " + tableName + ": "
- + toSizeLimitedString(unavailableSegments,
NUM_UNAVAILABLE_SEGMENTS_TO_LOG));
- brokerResponse.addException(errMsg);
+ if (!QueryOptionsUtils.isIgnoreMissingSegments(query.getOptions())) {
+ for (Map.Entry<String, Set<String>> entry :
dispatchableSubPlan.getTableToUnavailableSegmentsMap().entrySet()) {
+ String tableName = entry.getKey();
+ Set<String> unavailableSegments = entry.getValue();
+ int unavailableSegmentsInSubPlan = unavailableSegments.size();
+ numUnavailableSegments += unavailableSegmentsInSubPlan;
+ QueryProcessingException errMsg = new
QueryProcessingException(QueryErrorCode.SERVER_SEGMENT_MISSING,
+ "Found " + unavailableSegmentsInSubPlan + " unavailable segments
for table " + tableName + ": "
+ + toSizeLimitedString(unavailableSegments,
NUM_UNAVAILABLE_SEGMENTS_TO_LOG));
+ brokerResponse.addException(errMsg);
+ }
}
requestContext.setNumUnavailableSegments(numUnavailableSegments);
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 01acd526274..43b9036e741 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -441,6 +441,10 @@ public class QueryOptionsUtils {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UNAVAILABLE_SERVERS));
}
+ public static boolean isIgnoreMissingSegments(Map<String, String>
queryOptions) {
+ return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IGNORE_MISSING_SEGMENTS));
+ }
+
public static boolean isSecondaryWorkload(Map<String, String> queryOptions) {
return
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
}
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
index b2ca6573b63..5e32fc83fbb 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
@@ -61,6 +61,19 @@ public class QueryOptionsUtilsTest {
assertEquals(resolved.get(USE_MULTISTAGE_ENGINE), "false");
}
+ @Test
+ public void shouldReadIgnoreMissingSegmentsOption() {
+ // Given:
+ Map<String, String> optsTrue = Map.of(IGNORE_MISSING_SEGMENTS, "true");
+ Map<String, String> optsFalse = Map.of(IGNORE_MISSING_SEGMENTS, "false");
+ Map<String, String> optsMissing = Map.of();
+
+ // Then:
+
org.testng.Assert.assertTrue(QueryOptionsUtils.isIgnoreMissingSegments(optsTrue));
+
org.testng.Assert.assertFalse(QueryOptionsUtils.isIgnoreMissingSegments(optsFalse));
+
org.testng.Assert.assertFalse(QueryOptionsUtils.isIgnoreMissingSegments(optsMissing));
+ }
+
@Test
public void testSkipIndexesParsing() {
String skipIndexesStr = "col1=inverted,range&col2=sorted";
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index d63710dd3ff..ba07d3df1d8 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -278,7 +278,7 @@ public class ServerQueryExecutorV1Impl implements
QueryExecutor {
List<String> missingSegments = executionInfo.getMissingSegments();
int numMissingSegments = missingSegments.size();
- if (numMissingSegments > 0) {
+ if ((numMissingSegments > 0) &&
(!QueryOptionsUtils.isIgnoreMissingSegments(queryContext.getQueryOptions()))) {
instanceResponse.addException(QueryErrorCode.SERVER_SEGMENT_MISSING,
numMissingSegments + " segments " + missingSegments + " missing on
server: "
+ _instanceDataManager.getInstanceId());
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 781dbd9b1df..498fd4fd8e1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -128,6 +128,14 @@ public class BrokerReduceService extends BaseReduceService
{
// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
+ // If configured, filter out SERVER_SEGMENT_MISSING exceptions emitted by
servers. This must happen after
+ // aggregator.setStats(), because the aggregator appends server exceptions
during setStats.
+ Map<String, String> brokerQueryOptions =
brokerRequest.getPinotQuery().getQueryOptions();
+ if (brokerQueryOptions != null &&
QueryOptionsUtils.isIgnoreMissingSegments(brokerQueryOptions)) {
+ brokerResponseNative.getExceptions().removeIf(
+ ex -> ex.getErrorCode() ==
QueryErrorCode.SERVER_SEGMENT_MISSING.getId());
+ }
+
// Report the servers with conflicting data schema.
if (!serversWithConflictingDataSchema.isEmpty()) {
QueryErrorCode errorCode = QueryErrorCode.MERGE_RESPONSE;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index 8b61a97d554..9802b7d925c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -38,6 +38,7 @@ import
org.apache.pinot.core.query.request.context.QueryContext;
import
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
@@ -115,6 +116,13 @@ public class StreamingReduceService extends
BaseReduceService {
// Set execution statistics and Update broker metrics.
aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
+ // If configured, filter out SERVER_SEGMENT_MISSING exceptions emitted by
servers. This must happen after
+ // aggregator.setStats(), because the aggregator appends server exceptions
during setStats.
+ if (queryOptions != null &&
QueryOptionsUtils.isIgnoreMissingSegments(queryOptions)) {
+ brokerResponseNative.getExceptions().removeIf(
+ ex -> ex.getErrorCode() ==
QueryErrorCode.SERVER_SEGMENT_MISSING.getId());
+ }
+
updateAlias(queryContext, brokerResponseNative);
return brokerResponseNative;
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index 7c9bd2e4f49..ca98739e4da 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -55,6 +55,7 @@ import org.apache.pinot.spi.data.readers.FileFormat;
import org.apache.pinot.spi.env.CommonsConfigurationUtils;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.ReadMode;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -171,6 +172,30 @@ public class QueryExecutorExceptionsTest {
assertEqualsNoOrder(actualMissingSegments, expectedMissingSegments);
}
+ /**
+ * When ignoreMissingSegments is set in queryOptions, the server should not
populate SERVER_SEGMENT_MISSING exception.
+ */
+ @Test
+ public void testServerSegmentMissingExceptionIgnoredByOption() {
+ String query = "SELECT COUNT(*) FROM " + OFFLINE_TABLE_NAME;
+ // 1) Without the option -> we should see SERVER_SEGMENT_MISSING
+ InstanceRequest instanceRequestNoOpt = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
+ instanceRequestNoOpt.setSearchSegments(_segmentNames);
+ InstanceResponseBlock responseNoOpt =
_queryExecutor.execute(getQueryRequest(instanceRequestNoOpt), QUERY_RUNNERS);
+ Map<Integer, String> exceptionsNoOpt = responseNoOpt.getExceptions();
+
assertTrue(exceptionsNoOpt.containsKey(QueryErrorCode.SERVER_SEGMENT_MISSING.getId()));
+
+ // 2) With ignoreMissingSegments=true -> we should NOT see
SERVER_SEGMENT_MISSING
+ InstanceRequest instanceRequestOpt = new InstanceRequest(0L,
CalciteSqlCompiler.compileToBrokerRequest(query));
+ instanceRequestOpt.getQuery()
+ .getPinotQuery()
+
.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.IGNORE_MISSING_SEGMENTS,
"true");
+ instanceRequestOpt.setSearchSegments(_segmentNames);
+ InstanceResponseBlock responseOpt =
_queryExecutor.execute(getQueryRequest(instanceRequestOpt), QUERY_RUNNERS);
+ Map<Integer, String> exceptionsOpt = responseOpt.getExceptions();
+
assertFalse(exceptionsOpt.containsKey(QueryErrorCode.SERVER_SEGMENT_MISSING.getId()));
+ }
+
@AfterClass
public void tearDown() {
for (IndexSegment segment : _indexSegments) {
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
index 0f8ce988e47..06560fb178d 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Broker;
import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.testng.annotations.Test;
@@ -79,4 +80,47 @@ public class BrokerReduceServiceTest {
assertEquals(exceptions.size(), 1);
assertEquals(exceptions.get(0).getErrorCode(),
QueryErrorCode.BROKER_TIMEOUT.getId());
}
+
+ @Test
+ public void testIgnoreMissingSegmentsFiltering()
+ throws Exception {
+ // Build a simple broker reduce service
+ BrokerReduceService brokerReduceService =
+ new BrokerReduceService(new
PinotConfiguration(Map.of(Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2)));
+
+ // Prepare a broker request with queryOptions toggled
+ BrokerRequest brokerRequestNoIgnore =
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM
testTable");
+ BrokerRequest brokerRequestIgnore =
+ CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM
testTable");
+ brokerRequestIgnore.getPinotQuery().putToQueryOptions(
+ CommonConstants.Broker.Request.QueryOptionKey.IGNORE_MISSING_SEGMENTS,
"true");
+
+ // Create a metadata-only DataTable with a SERVER_SEGMENT_MISSING exception
+ DataTableBuilder dataTableBuilder =
DataTableBuilderFactory.getDataTableBuilder(
+ new DataSchema(new String[]{"count(*)"}, new
ColumnDataType[]{ColumnDataType.LONG}));
+ // no rows; build data table and then mark it metadata-only
+ DataTable dataTable = dataTableBuilder.build().toMetadataOnlyDataTable();
+ dataTable.addException(QueryErrorCode.SERVER_SEGMENT_MISSING,
+ "1 segments [segA] missing on server: Server_localhost_12345");
+
+ Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+ dataTableMap.put(new ServerRoutingInstance("localhost", 12345,
TableType.OFFLINE), dataTable);
+
+ // Case 1: ignoreMissingSegments=false (default) -> exception should be
present
+ BrokerResponseNative responseNoIgnore =
brokerReduceService.reduceOnDataTable(
+ brokerRequestNoIgnore, brokerRequestNoIgnore, dataTableMap, 10_000,
mock(BrokerMetrics.class));
+ long missingErrCountNoIgnore = responseNoIgnore.getExceptions().stream()
+ .filter(e -> e.getErrorCode() ==
QueryErrorCode.SERVER_SEGMENT_MISSING.getId()).count();
+ assertEquals(missingErrCountNoIgnore, 1L);
+
+ // Case 2: ignoreMissingSegments=true -> exception should be filtered out
+ BrokerResponseNative responseIgnore =
brokerReduceService.reduceOnDataTable(
+ brokerRequestIgnore, brokerRequestIgnore, dataTableMap, 10_000,
mock(BrokerMetrics.class));
+ long missingErrCountIgnore = responseIgnore.getExceptions().stream()
+ .filter(e -> e.getErrorCode() ==
QueryErrorCode.SERVER_SEGMENT_MISSING.getId()).count();
+ assertEquals(missingErrCountIgnore, 0L);
+
+ brokerReduceService.shutDown();
+ }
}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
index a48b12563d1..3956d1d31b7 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
@@ -25,15 +25,27 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeoutException;
import java.util.function.Predicate;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
import org.apache.pinot.core.transport.ServerRoutingInstance;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;
@@ -91,6 +103,48 @@ public class StreamingReduceServiceTest {
(cause) -> cause instanceof TimeoutException));
}
+ @Test
+ public void testIgnoreMissingSegmentsFiltering()
+ throws Exception {
+ // Build a metadata-only DataTable with a SERVER_SEGMENT_MISSING exception
encoded as a streaming response
+ DataSchema schema =
+ new DataSchema(new String[]{"col1"}, new
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
+ DataTableBuilder builder =
DataTableBuilderFactory.getDataTableBuilder(schema);
+ DataTable dataTable = builder.build().toMetadataOnlyDataTable();
+ dataTable.addException(QueryErrorCode.SERVER_SEGMENT_MISSING,
+ "1 segments [segA] missing on server: Server_localhost_9527");
+ // Set a request id in metadata so routing handling can route to the
active async response
+ dataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(),
"1");
+ byte[] payload = dataTable.toBytes();
+
+ // Mock one server streaming response yielding the metadata-only block
+ Iterator<Server.ServerResponse> mockedResponse =
(Iterator<Server.ServerResponse>) mock(Iterator.class);
+ when(mockedResponse.hasNext()).thenReturn(true, false);
+ Server.ServerResponse resp = Server.ServerResponse.newBuilder()
+ .setPayload(com.google.protobuf.ByteString.copyFrom(payload)).build();
+ when(mockedResponse.next()).thenReturn(resp);
+
+ // Prepare inputs for reduceOnStreamResponse
+ StreamingReduceService service = new StreamingReduceService(new
PinotConfiguration(java.util.Map.of()));
+ BrokerRequest brokerRequest =
CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable LIMIT 1");
+ // Set the query option to ignore missing segments
+ brokerRequest.getPinotQuery()
+
.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.IGNORE_MISSING_SEGMENTS,
"true");
+
+ java.util.Map<ServerRoutingInstance, Iterator<Server.ServerResponse>>
serverResponseMap = java.util.Map.of(
+ new ServerRoutingInstance("localhost", 9527, TableType.OFFLINE),
mockedResponse);
+
+ BrokerMetrics metrics = mock(BrokerMetrics.class);
+ // Execute
+ BrokerResponseNative response =
service.reduceOnStreamResponse(brokerRequest, serverResponseMap, 1000, metrics);
+
+ // Validate the SERVER_SEGMENT_MISSING was filtered out
+ boolean hasMissing = response.getExceptions()
+ .stream()
+ .anyMatch(e -> e.getErrorCode() ==
QueryErrorCode.SERVER_SEGMENT_MISSING.getId());
+ assertFalse(hasMissing);
+ }
+
private static boolean verifyException(Callable<Void> verifyTarget,
Predicate<Throwable> verifyCause) {
boolean exceptionVerified = false;
if (verifyTarget == null || verifyCause == null) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fa4d0adc302..4f8d3806a4f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -458,6 +458,12 @@ public class CommonConstants {
public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER =
"pinot.broker.enable.partition.metadata.manager";
public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER =
true;
+
+ // When enabled, the broker will set a query option to ignore
SERVER_SEGMENT_MISSING errors from servers.
+ // This is useful to tolerate short windows where routing has not yet
reflected recently deleted segments.
+ public static final String CONFIG_OF_IGNORE_MISSING_SEGMENTS =
+ "pinot.broker.query.ignore.missing.segments";
+ public static final boolean DEFAULT_IGNORE_MISSING_SEGMENTS = false;
// Whether to infer partition hint by default or not.
// This value can always be overridden by INFER_PARTITION_HINT query option
public static final String CONFIG_OF_INFER_PARTITION_HINT =
"pinot.broker.multistage.infer.partition.hint";
@@ -702,6 +708,10 @@ public class CommonConstants {
// If query submission causes an exception, still continue to submit
the query to other servers
public static final String SKIP_UNAVAILABLE_SERVERS =
"skipUnavailableServers";
+ // Ignore server-side segment missing errors and proceed without
marking the query as failed.
+ // When set to true, SERVER_SEGMENT_MISSING exceptions are filtered
out on the broker.
+ public static final String IGNORE_MISSING_SEGMENTS =
"ignoreMissingSegments";
+
// Indicates that a query belongs to a secondary workload when using
the BinaryWorkloadScheduler. The
// BinaryWorkloadScheduler divides queries into two workloads, primary
and secondary. Primary workloads are
// executed in an Unbounded FCFS fashion. However, secondary
workloads are executed in a constrainted FCFS
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]