This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 285a18b8442 broker/server: add ignoreMissingSegments query option with 
broker config; skip SERVER_SEGMENT_MISSING errors when enabled; add SSE/MSE 
wiring and unit tests (#16556)
285a18b8442 is described below

commit 285a18b84427e7e8e7e60ea8970e9fc2d5d325da
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Aug 17 23:57:54 2025 -0700

    broker/server: add ignoreMissingSegments query option with broker config; 
skip SERVER_SEGMENT_MISSING errors when enabled; add SSE/MSE wiring and unit 
tests (#16556)
---
 .../BaseSingleStageBrokerRequestHandler.java       | 11 +++++
 .../MultiStageBrokerRequestHandler.java            | 40 +++++++++-------
 .../common/utils/config/QueryOptionsUtils.java     |  4 ++
 .../common/utils/config/QueryOptionsUtilsTest.java | 13 ++++++
 .../query/executor/ServerQueryExecutorV1Impl.java  |  2 +-
 .../core/query/reduce/BrokerReduceService.java     |  8 ++++
 .../core/query/reduce/StreamingReduceService.java  |  8 ++++
 .../executor/QueryExecutorExceptionsTest.java      | 25 ++++++++++
 .../core/query/reduce/BrokerReduceServiceTest.java | 44 ++++++++++++++++++
 .../query/reduce/StreamingReduceServiceTest.java   | 54 ++++++++++++++++++++++
 .../apache/pinot/spi/utils/CommonConstants.java    | 10 ++++
 11 files changed, 201 insertions(+), 18 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
index c829ae8427b..199fbdf9d76 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/BaseSingleStageBrokerRequestHandler.java
@@ -745,6 +745,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
           serverBrokerRequest.getPinotQuery().getQueryOptions()
               .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
         }
+        // Optionally set ignoreMissingSegments based on broker config
+        setIgnoreMissingSegmentsIfConfigured(queryOptions);
       }
     }
     if (realtimeBrokerRequest != null) {
@@ -758,6 +760,8 @@ public abstract class BaseSingleStageBrokerRequestHandler 
extends BaseBrokerRequ
           serverBrokerRequest.getPinotQuery().getQueryOptions()
               .put(QueryOptionKey.SERVER_RETURN_FINAL_RESULT, "true");
         }
+        // Optionally set ignoreMissingSegments based on broker config
+        setIgnoreMissingSegmentsIfConfigured(queryOptions);
       }
     }
 
@@ -2082,6 +2086,13 @@ public abstract class 
BaseSingleStageBrokerRequestHandler extends BaseBrokerRequ
     return _brokerId + "_" + requestId;
   }
 
+  private void setIgnoreMissingSegmentsIfConfigured(Map<String, String> 
queryOptions) {
+    if 
(_config.getProperty(CommonConstants.Broker.CONFIG_OF_IGNORE_MISSING_SEGMENTS,
+        CommonConstants.Broker.DEFAULT_IGNORE_MISSING_SEGMENTS)) {
+      queryOptions.putIfAbsent(QueryOptionKey.IGNORE_MISSING_SEGMENTS, "true");
+    }
+  }
+
   /**
    * Helper class to pass the per server statistics.
    */
diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
index f841e46aafc..a366fd07144 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java
@@ -120,7 +120,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
   ///      </AppenderRef>
   ///    </Logger>
   ///  </Loggers>
-  /// ```
+  ///```
   private static final Marker MSE_STATS_MARKER = 
MarkerFactory.getMarker("MSE_STATS_MARKER");
 
   private static final int NUM_UNAVAILABLE_SEGMENTS_TO_LOG = 10;
@@ -166,9 +166,9 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
         
CommonConstants.MultiStageQueryRunner.DEFAULT_OF_MULTISTAGE_EXPLAIN_INCLUDE_SEGMENT_PLAN);
     _queryThrottler = queryThrottler;
     _queryCompileExecutor = QueryThreadContext.contextAwareExecutorService(
-            Executors.newFixedThreadPool(
-                Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
-                new NamedThreadFactory("multi-stage-query-compile-executor")));
+        Executors.newFixedThreadPool(
+            Math.max(1, Runtime.getRuntime().availableProcessors() / 2),
+            new NamedThreadFactory("multi-stage-query-compile-executor")));
   }
 
   @Override
@@ -442,7 +442,6 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     return passiveTimeoutMsFromQueryOption != null ? 
passiveTimeoutMsFromQueryOption : _extraPassiveTimeoutMs;
   }
 
-
   /**
    * Explains the query and returns the broker response.
    *
@@ -463,7 +462,6 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
     QueryEnvironment.QueryPlannerResult queryPlanResult = callAsync(requestId, 
query.getTextQuery(),
         () -> query.explain(requestId, fragmentToPlanNode), timer);
     String plan = queryPlanResult.getExplainPlan();
-    Set<String> tableNames = queryPlanResult.getTableNames();
     Map<String, String> extraFields = queryPlanResult.getExtraFields();
     return constructMultistageExplainPlan(query.getTextQuery(), plan, 
extraFields);
   }
@@ -477,6 +475,12 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
 
     DispatchableSubPlan dispatchableSubPlan = queryPlanResult.getQueryPlan();
 
+    // Optionally set ignoreMissingSegments query option based on broker 
config if not already set.
+    if 
(_config.getProperty(CommonConstants.Broker.CONFIG_OF_IGNORE_MISSING_SEGMENTS,
+        CommonConstants.Broker.DEFAULT_IGNORE_MISSING_SEGMENTS)) {
+      
query.getOptions().putIfAbsent(CommonConstants.Broker.Request.QueryOptionKey.IGNORE_MISSING_SEGMENTS,
 "true");
+    }
+
     Set<QueryServerInstance> servers = new HashSet<>();
     for (DispatchablePlanFragment planFragment : 
dispatchableSubPlan.getQueryStageMap().values()) {
       servers.addAll(planFragment.getServerInstances());
@@ -537,7 +541,7 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       QueryDispatcher.QueryResult queryResults;
       try {
         queryResults = _queryDispatcher.submitAndReduce(requestContext, 
dispatchableSubPlan, timer.getRemainingTimeMs(),
-                query.getOptions());
+            query.getOptions());
       } catch (QueryException e) {
         throw e;
       } catch (Throwable t) {
@@ -581,17 +585,19 @@ public class MultiStageBrokerRequestHandler extends 
BaseBrokerRequestHandler {
       brokerResponse.setNumServersQueried(servers.size() - 1);
       brokerResponse.setNumServersResponded(servers.size() - 1);
 
-      // Attach unavailable segments
+      // Attach unavailable segments (unless configured to ignore missing 
segments)
       int numUnavailableSegments = 0;
-      for (Map.Entry<String, Set<String>> entry : 
dispatchableSubPlan.getTableToUnavailableSegmentsMap().entrySet()) {
-        String tableName = entry.getKey();
-        Set<String> unavailableSegments = entry.getValue();
-        int unavailableSegmentsInSubPlan = unavailableSegments.size();
-        numUnavailableSegments += unavailableSegmentsInSubPlan;
-        QueryProcessingException errMsg = new 
QueryProcessingException(QueryErrorCode.SERVER_SEGMENT_MISSING,
-            "Found " + unavailableSegmentsInSubPlan + " unavailable segments 
for table " + tableName + ": "
-                + toSizeLimitedString(unavailableSegments, 
NUM_UNAVAILABLE_SEGMENTS_TO_LOG));
-        brokerResponse.addException(errMsg);
+      if (!QueryOptionsUtils.isIgnoreMissingSegments(query.getOptions())) {
+        for (Map.Entry<String, Set<String>> entry : 
dispatchableSubPlan.getTableToUnavailableSegmentsMap().entrySet()) {
+          String tableName = entry.getKey();
+          Set<String> unavailableSegments = entry.getValue();
+          int unavailableSegmentsInSubPlan = unavailableSegments.size();
+          numUnavailableSegments += unavailableSegmentsInSubPlan;
+          QueryProcessingException errMsg = new 
QueryProcessingException(QueryErrorCode.SERVER_SEGMENT_MISSING,
+              "Found " + unavailableSegmentsInSubPlan + " unavailable segments 
for table " + tableName + ": "
+                  + toSizeLimitedString(unavailableSegments, 
NUM_UNAVAILABLE_SEGMENTS_TO_LOG));
+          brokerResponse.addException(errMsg);
+        }
       }
       requestContext.setNumUnavailableSegments(numUnavailableSegments);
 
diff --git 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
index 01acd526274..43b9036e741 100644
--- 
a/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
+++ 
b/pinot-common/src/main/java/org/apache/pinot/common/utils/config/QueryOptionsUtils.java
@@ -441,6 +441,10 @@ public class QueryOptionsUtils {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.SKIP_UNAVAILABLE_SERVERS));
   }
 
+  public static boolean isIgnoreMissingSegments(Map<String, String> 
queryOptions) {
+    return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IGNORE_MISSING_SEGMENTS));
+  }
+
   public static boolean isSecondaryWorkload(Map<String, String> queryOptions) {
     return 
Boolean.parseBoolean(queryOptions.get(QueryOptionKey.IS_SECONDARY_WORKLOAD));
   }
diff --git 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
index b2ca6573b63..5e32fc83fbb 100644
--- 
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
+++ 
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/QueryOptionsUtilsTest.java
@@ -61,6 +61,19 @@ public class QueryOptionsUtilsTest {
     assertEquals(resolved.get(USE_MULTISTAGE_ENGINE), "false");
   }
 
+  @Test
+  public void shouldReadIgnoreMissingSegmentsOption() {
+    // Given:
+    Map<String, String> optsTrue = Map.of(IGNORE_MISSING_SEGMENTS, "true");
+    Map<String, String> optsFalse = Map.of(IGNORE_MISSING_SEGMENTS, "false");
+    Map<String, String> optsMissing = Map.of();
+
+    // Then:
+    
org.testng.Assert.assertTrue(QueryOptionsUtils.isIgnoreMissingSegments(optsTrue));
+    
org.testng.Assert.assertFalse(QueryOptionsUtils.isIgnoreMissingSegments(optsFalse));
+    
org.testng.Assert.assertFalse(QueryOptionsUtils.isIgnoreMissingSegments(optsMissing));
+  }
+
   @Test
   public void testSkipIndexesParsing() {
     String skipIndexesStr = "col1=inverted,range&col2=sorted";
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
index d63710dd3ff..ba07d3df1d8 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/executor/ServerQueryExecutorV1Impl.java
@@ -278,7 +278,7 @@ public class ServerQueryExecutorV1Impl implements 
QueryExecutor {
       List<String> missingSegments = executionInfo.getMissingSegments();
 
       int numMissingSegments = missingSegments.size();
-      if (numMissingSegments > 0) {
+      if ((numMissingSegments > 0) && 
(!QueryOptionsUtils.isIgnoreMissingSegments(queryContext.getQueryOptions()))) {
         instanceResponse.addException(QueryErrorCode.SERVER_SEGMENT_MISSING,
             numMissingSegments + " segments " + missingSegments + " missing on 
server: "
                 + _instanceDataManager.getInstanceId());
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
index 781dbd9b1df..498fd4fd8e1 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/BrokerReduceService.java
@@ -128,6 +128,14 @@ public class BrokerReduceService extends BaseReduceService 
{
     // Set execution statistics and Update broker metrics.
     aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
 
+    // If configured, filter out SERVER_SEGMENT_MISSING exceptions emitted by 
servers. This must happen after
+    // aggregator.setStats(), because the aggregator appends server exceptions 
during setStats.
+    Map<String, String> brokerQueryOptions = 
brokerRequest.getPinotQuery().getQueryOptions();
+    if (brokerQueryOptions != null && 
QueryOptionsUtils.isIgnoreMissingSegments(brokerQueryOptions)) {
+      brokerResponseNative.getExceptions().removeIf(
+          ex -> ex.getErrorCode() == 
QueryErrorCode.SERVER_SEGMENT_MISSING.getId());
+    }
+
     // Report the servers with conflicting data schema.
     if (!serversWithConflictingDataSchema.isEmpty()) {
       QueryErrorCode errorCode = QueryErrorCode.MERGE_RESPONSE;
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
index 8b61a97d554..9802b7d925c 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/query/reduce/StreamingReduceService.java
@@ -38,6 +38,7 @@ import 
org.apache.pinot.core.query.request.context.QueryContext;
 import 
org.apache.pinot.core.query.request.context.utils.QueryContextConverterUtils;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
@@ -115,6 +116,13 @@ public class StreamingReduceService extends 
BaseReduceService {
     // Set execution statistics and Update broker metrics.
     aggregator.setStats(rawTableName, brokerResponseNative, brokerMetrics);
 
+    // If configured, filter out SERVER_SEGMENT_MISSING exceptions emitted by 
servers. This must happen after
+    // aggregator.setStats(), because the aggregator appends server exceptions 
during setStats.
+    if (queryOptions != null && 
QueryOptionsUtils.isIgnoreMissingSegments(queryOptions)) {
+      brokerResponseNative.getExceptions().removeIf(
+          ex -> ex.getErrorCode() == 
QueryErrorCode.SERVER_SEGMENT_MISSING.getId());
+    }
+
     updateAlias(queryContext, brokerResponseNative);
     return brokerResponseNative;
   }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
index 7c9bd2e4f49..ca98739e4da 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/executor/QueryExecutorExceptionsTest.java
@@ -55,6 +55,7 @@ import org.apache.pinot.spi.data.readers.FileFormat;
 import org.apache.pinot.spi.env.CommonsConfigurationUtils;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.ReadMode;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -171,6 +172,30 @@ public class QueryExecutorExceptionsTest {
     assertEqualsNoOrder(actualMissingSegments, expectedMissingSegments);
   }
 
+  /**
+   * When ignoreMissingSegments is set in queryOptions, the server should not 
populate SERVER_SEGMENT_MISSING exception.
+   */
+  @Test
+  public void testServerSegmentMissingExceptionIgnoredByOption() {
+    String query = "SELECT COUNT(*) FROM " + OFFLINE_TABLE_NAME;
+    // 1) Without the option -> we should see SERVER_SEGMENT_MISSING
+    InstanceRequest instanceRequestNoOpt = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
+    instanceRequestNoOpt.setSearchSegments(_segmentNames);
+    InstanceResponseBlock responseNoOpt = 
_queryExecutor.execute(getQueryRequest(instanceRequestNoOpt), QUERY_RUNNERS);
+    Map<Integer, String> exceptionsNoOpt = responseNoOpt.getExceptions();
+    
assertTrue(exceptionsNoOpt.containsKey(QueryErrorCode.SERVER_SEGMENT_MISSING.getId()));
+
+    // 2) With ignoreMissingSegments=true -> we should NOT see 
SERVER_SEGMENT_MISSING
+    InstanceRequest instanceRequestOpt = new InstanceRequest(0L, 
CalciteSqlCompiler.compileToBrokerRequest(query));
+    instanceRequestOpt.getQuery()
+        .getPinotQuery()
+        
.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.IGNORE_MISSING_SEGMENTS,
 "true");
+    instanceRequestOpt.setSearchSegments(_segmentNames);
+    InstanceResponseBlock responseOpt = 
_queryExecutor.execute(getQueryRequest(instanceRequestOpt), QUERY_RUNNERS);
+    Map<Integer, String> exceptionsOpt = responseOpt.getExceptions();
+    
assertFalse(exceptionsOpt.containsKey(QueryErrorCode.SERVER_SEGMENT_MISSING.getId()));
+  }
+
   @AfterClass
   public void tearDown() {
     for (IndexSegment segment : _indexSegments) {
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
index 0f8ce988e47..06560fb178d 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/BrokerReduceServiceTest.java
@@ -35,6 +35,7 @@ import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
 import org.apache.pinot.spi.env.PinotConfiguration;
 import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Broker;
 import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.testng.annotations.Test;
@@ -79,4 +80,47 @@ public class BrokerReduceServiceTest {
     assertEquals(exceptions.size(), 1);
     assertEquals(exceptions.get(0).getErrorCode(), 
QueryErrorCode.BROKER_TIMEOUT.getId());
   }
+
+  @Test
+  public void testIgnoreMissingSegmentsFiltering()
+      throws Exception {
+    // Build a simple broker reduce service
+    BrokerReduceService brokerReduceService =
+        new BrokerReduceService(new 
PinotConfiguration(Map.of(Broker.CONFIG_OF_MAX_REDUCE_THREADS_PER_QUERY, 2)));
+
+    // Prepare a broker request with queryOptions toggled
+    BrokerRequest brokerRequestNoIgnore =
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM 
testTable");
+    BrokerRequest brokerRequestIgnore =
+        CalciteSqlCompiler.compileToBrokerRequest("SELECT COUNT(*) FROM 
testTable");
+    brokerRequestIgnore.getPinotQuery().putToQueryOptions(
+        CommonConstants.Broker.Request.QueryOptionKey.IGNORE_MISSING_SEGMENTS, 
"true");
+
+    // Create a metadata-only DataTable with a SERVER_SEGMENT_MISSING exception
+    DataTableBuilder dataTableBuilder = 
DataTableBuilderFactory.getDataTableBuilder(
+        new DataSchema(new String[]{"count(*)"}, new 
ColumnDataType[]{ColumnDataType.LONG}));
+    // no rows; build data table and then mark it metadata-only
+    DataTable dataTable = dataTableBuilder.build().toMetadataOnlyDataTable();
+    dataTable.addException(QueryErrorCode.SERVER_SEGMENT_MISSING,
+        "1 segments [segA] missing on server: Server_localhost_12345");
+
+    Map<ServerRoutingInstance, DataTable> dataTableMap = new HashMap<>();
+    dataTableMap.put(new ServerRoutingInstance("localhost", 12345, 
TableType.OFFLINE), dataTable);
+
+    // Case 1: ignoreMissingSegments=false (default) -> exception should be 
present
+    BrokerResponseNative responseNoIgnore = 
brokerReduceService.reduceOnDataTable(
+        brokerRequestNoIgnore, brokerRequestNoIgnore, dataTableMap, 10_000, 
mock(BrokerMetrics.class));
+    long missingErrCountNoIgnore = responseNoIgnore.getExceptions().stream()
+        .filter(e -> e.getErrorCode() == 
QueryErrorCode.SERVER_SEGMENT_MISSING.getId()).count();
+    assertEquals(missingErrCountNoIgnore, 1L);
+
+    // Case 2: ignoreMissingSegments=true -> exception should be filtered out
+    BrokerResponseNative responseIgnore = 
brokerReduceService.reduceOnDataTable(
+        brokerRequestIgnore, brokerRequestIgnore, dataTableMap, 10_000, 
mock(BrokerMetrics.class));
+    long missingErrCountIgnore = responseIgnore.getExceptions().stream()
+        .filter(e -> e.getErrorCode() == 
QueryErrorCode.SERVER_SEGMENT_MISSING.getId()).count();
+    assertEquals(missingErrCountIgnore, 0L);
+
+    brokerReduceService.shutDown();
+  }
 }
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
index a48b12563d1..3956d1d31b7 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/query/reduce/StreamingReduceServiceTest.java
@@ -25,15 +25,27 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeoutException;
 import java.util.function.Predicate;
+import org.apache.pinot.common.datatable.DataTable;
+import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.proto.Server;
+import org.apache.pinot.common.request.BrokerRequest;
+import org.apache.pinot.common.response.broker.BrokerResponseNative;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.datatable.DataTableBuilder;
+import org.apache.pinot.core.common.datatable.DataTableBuilderFactory;
 import org.apache.pinot.core.transport.ServerRoutingInstance;
 import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.exception.QueryErrorCode;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.sql.parsers.CalciteSqlCompiler;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 import org.testng.annotations.Test;
 
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
 
@@ -91,6 +103,48 @@ public class StreamingReduceServiceTest {
         (cause) -> cause instanceof TimeoutException));
   }
 
+  @Test
+  public void testIgnoreMissingSegmentsFiltering()
+      throws Exception {
+    // Build a metadata-only DataTable with a SERVER_SEGMENT_MISSING exception 
encoded as a streaming response
+    DataSchema schema =
+        new DataSchema(new String[]{"col1"}, new 
DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.LONG});
+    DataTableBuilder builder = 
DataTableBuilderFactory.getDataTableBuilder(schema);
+    DataTable dataTable = builder.build().toMetadataOnlyDataTable();
+    dataTable.addException(QueryErrorCode.SERVER_SEGMENT_MISSING,
+        "1 segments [segA] missing on server: Server_localhost_9527");
+    // Set a request id in metadata so routing handling can route to the 
active async response
+    dataTable.getMetadata().put(DataTable.MetadataKey.REQUEST_ID.getName(), 
"1");
+    byte[] payload = dataTable.toBytes();
+
+    // Mock one server streaming response yielding the metadata-only block
+    Iterator<Server.ServerResponse> mockedResponse = 
(Iterator<Server.ServerResponse>) mock(Iterator.class);
+    when(mockedResponse.hasNext()).thenReturn(true, false);
+    Server.ServerResponse resp = Server.ServerResponse.newBuilder()
+        .setPayload(com.google.protobuf.ByteString.copyFrom(payload)).build();
+    when(mockedResponse.next()).thenReturn(resp);
+
+    // Prepare inputs for reduceOnStreamResponse
+    StreamingReduceService service = new StreamingReduceService(new 
PinotConfiguration(java.util.Map.of()));
+    BrokerRequest brokerRequest = 
CalciteSqlCompiler.compileToBrokerRequest("SELECT col1 FROM testTable LIMIT 1");
+    // Set the query option to ignore missing segments
+    brokerRequest.getPinotQuery()
+        
.putToQueryOptions(CommonConstants.Broker.Request.QueryOptionKey.IGNORE_MISSING_SEGMENTS,
 "true");
+
+    java.util.Map<ServerRoutingInstance, Iterator<Server.ServerResponse>> 
serverResponseMap = java.util.Map.of(
+        new ServerRoutingInstance("localhost", 9527, TableType.OFFLINE), 
mockedResponse);
+
+    BrokerMetrics metrics = mock(BrokerMetrics.class);
+    // Execute
+    BrokerResponseNative response = 
service.reduceOnStreamResponse(brokerRequest, serverResponseMap, 1000, metrics);
+
+    // Validate the SERVER_SEGMENT_MISSING was filtered out
+    boolean hasMissing = response.getExceptions()
+        .stream()
+        .anyMatch(e -> e.getErrorCode() == 
QueryErrorCode.SERVER_SEGMENT_MISSING.getId());
+    assertFalse(hasMissing);
+  }
+
   private static boolean verifyException(Callable<Void> verifyTarget, 
Predicate<Throwable> verifyCause) {
     boolean exceptionVerified = false;
     if (verifyTarget == null || verifyCause == null) {
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index fa4d0adc302..4f8d3806a4f 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -458,6 +458,12 @@ public class CommonConstants {
     public static final String CONFIG_OF_ENABLE_PARTITION_METADATA_MANAGER =
         "pinot.broker.enable.partition.metadata.manager";
     public static final boolean DEFAULT_ENABLE_PARTITION_METADATA_MANAGER = 
true;
+
+      // When enabled, the broker will set a query option to ignore 
SERVER_SEGMENT_MISSING errors from servers.
+      // This is useful to tolerate short windows where routing has not yet 
reflected recently deleted segments.
+      public static final String CONFIG_OF_IGNORE_MISSING_SEGMENTS =
+          "pinot.broker.query.ignore.missing.segments";
+      public static final boolean DEFAULT_IGNORE_MISSING_SEGMENTS = false;
     // Whether to infer partition hint by default or not.
     // This value can always be overridden by INFER_PARTITION_HINT query option
     public static final String CONFIG_OF_INFER_PARTITION_HINT = 
"pinot.broker.multistage.infer.partition.hint";
@@ -702,6 +708,10 @@ public class CommonConstants {
         // If query submission causes an exception, still continue to submit 
the query to other servers
         public static final String SKIP_UNAVAILABLE_SERVERS = 
"skipUnavailableServers";
 
+        // Ignore server-side segment missing errors and proceed without 
marking the query as failed.
+        // When set to true, SERVER_SEGMENT_MISSING exceptions are filtered 
out on the broker.
+        public static final String IGNORE_MISSING_SEGMENTS = 
"ignoreMissingSegments";
+
         // Indicates that a query belongs to a secondary workload when using 
the BinaryWorkloadScheduler. The
         // BinaryWorkloadScheduler divides queries into two workloads, primary 
and secondary. Primary workloads are
         // executed in an  Unbounded FCFS fashion. However, secondary 
workloads are executed in a constrainted FCFS


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to