This is an automated email from the ASF dual-hosted git repository.

yqm pushed a commit to branch 37.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/37.0.0 by this push:
     new b3bd2450e89 feat: Segment Timout Per Datasource (#19221) (#19291)
b3bd2450e89 is described below

commit b3bd2450e8952732c6e463b07d325326b8245805
Author: Cece Mei <[email protected]>
AuthorDate: Fri Apr 10 17:07:17 2026 -0700

    feat: Segment Timout Per Datasource (#19221) (#19291)
    
    Adds the ability to configure perSegmentTimeout per datasource via 
BrokerDynamicConfig, so different per-segment timeout thresholds for can be set 
for different datasources e.g. larger datasources may need different thresholds 
than small ones.
    
    Co-authored-by: mshahid6 <[email protected]>
---
 docs/api-reference/dynamic-configuration-api.md    |  72 ++++++-
 .../org/apache/druid/server/QueryLifecycle.java    |  54 ++++-
 .../apache/druid/server/QueryLifecycleFactory.java |  16 +-
 .../druid/server/broker/BrokerDynamicConfig.java   |  41 +++-
 .../server/broker/PerSegmentTimeoutConfig.java     |  90 +++++++++
 .../server/PerSegmentTimeoutInjectionTest.java     | 223 +++++++++++++++++++++
 .../apache/druid/server/QueryLifecycleTest.java    |   3 +
 .../org/apache/druid/server/QueryResourceTest.java |   1 +
 .../server/broker/BrokerDynamicConfigTest.java     |  31 +++
 .../server/broker/PerSegmentTimeoutConfigTest.java |  82 ++++++++
 .../broker-dynamic-config-completions.ts           |  20 ++
 .../broker-dynamic-config.tsx                      |  18 ++
 12 files changed, 634 insertions(+), 17 deletions(-)

diff --git a/docs/api-reference/dynamic-configuration-api.md 
b/docs/api-reference/dynamic-configuration-api.md
index 80433a54103..95166aa3c54 100644
--- a/docs/api-reference/dynamic-configuration-api.md
+++ b/docs/api-reference/dynamic-configuration-api.md
@@ -376,6 +376,12 @@ Host: http://ROUTER_IP:ROUTER_PORT
   "queryContext": {
     "priority": 0,
     "timeout": 300000
+  },
+  "perSegmentTimeoutConfig": {
+    "large_table": {
+      "perSegmentTimeoutMs": 10000,
+      "monitorOnly": false
+    }
   }
 }
 ```
@@ -445,6 +451,15 @@ curl -X POST 
"http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/broker/config"; \
   "queryContext": {
     "priority": 0,
     "timeout": 300000
+  },
+  "perSegmentTimeoutConfig": {
+    "large_table": {
+      "perSegmentTimeoutMs": 10000
+    },
+    "huge_dataset": {
+      "perSegmentTimeoutMs": 5000,
+      "monitorOnly": true
+    }
   }
 }'
 ```
@@ -477,6 +492,15 @@ X-Druid-Comment: Add query blocklist rules and set default 
context
   "queryContext": {
     "priority": 0,
     "timeout": 300000
+  },
+  "perSegmentTimeoutConfig": {
+    "large_table": {
+      "perSegmentTimeoutMs": 10000
+    },
+    "huge_dataset": {
+      "perSegmentTimeoutMs": 5000,
+      "monitorOnly": true
+    }
   }
 }
 ```
@@ -496,6 +520,7 @@ The following table shows the dynamic configuration 
properties for the Broker.
 |--------|-----------|-------|
 |`queryBlocklist`| List of rules to block queries based on datasource, query 
type, and/or query context parameters. Each rule defines criteria that are 
combined with AND logic. Blocked queries return an HTTP 403 error. See [Query 
blocklist rules](#query-blocklist-rules) for details.|none|
 |`queryContext`| Map of default query context key-value pairs applied to all 
queries on this broker. These values override static defaults set via runtime 
properties (`druid.query.default.context.*`) but are overridden by context 
values supplied in individual query payloads. Useful for setting cluster-wide 
defaults such as `priority` or `timeout` without restarting. See [Query context 
reference](../querying/query-context-reference.md) for available keys.|none|
+|`perSegmentTimeoutConfig`| Map of datasource names to per-segment timeout 
configurations. When a query targets a datasource in this map, the Broker 
injects the configured `perSegmentTimeout` into the query context before 
forwarding to Historicals. See [Per-segment timeout 
configuration](#per-segment-timeout-configuration) for details.|none|
 
 #### Query blocklist rules
 
@@ -531,6 +556,51 @@ When a query is blocked, the Broker returns an HTTP 403 
error with a message ind
 }
 ```
 
+#### Per-segment timeout configuration
+
+Per-segment timeout configuration allows operators to set per-datasource 
segment processing timeouts without restarting the cluster. This is useful when 
different datasources have different performance characteristics — for example, 
allowing longer timeouts for larger datasets.
+
+Each entry in the `perSegmentTimeoutConfig` map is keyed by datasource name 
and has the following properties:
+
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`perSegmentTimeoutMs`|Per-segment processing timeout in milliseconds. Must be 
greater than 0.|Yes|N/A|
+|`monitorOnly`|When `true`, the timeout value is logged but not enforced. 
Useful for observing the impact of a timeout before enabling 
enforcement.|No|`false`|
+
+**Precedence order** (highest to lowest):
+
+1. User-supplied `perSegmentTimeout` in the query context
+2. Per-datasource value from `perSegmentTimeoutConfig`
+3. Cluster-wide default from `druid.query.default.context.perSegmentTimeout`
+
+> **Note:** For queries involving multiple datasources (e.g. joins or unions), 
the timeout from the
+> first matching datasource is applied. The match order is non-deterministic.
+> To avoid this, configure the same timeout for all datasources involved in 
such queries, or set `perSegmentTimeout` explicitly in the query context.
+
+**Example configuration:**
+
+```json
+{
+  "perSegmentTimeoutConfig": {
+    "large_datasource": {
+      "perSegmentTimeoutMs": 10000
+    },
+    "critical_datasource": {
+      "perSegmentTimeoutMs": 5000,
+      "monitorOnly": true
+    }
+  }
+}
+```
+
+To clear all per-datasource timeouts, POST an empty map:
+
+```json
+{
+  "perSegmentTimeoutConfig": {}
+}
+```
+
 ### Get broker dynamic configuration history
 
 Retrieves the history of changes to Broker dynamic configuration over an 
interval of time. Returns an empty array if there are no history records 
available.
@@ -603,7 +673,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
       "comment": "Add query blocklist rules",
       "ip": "127.0.0.1"
     },
-    "payload": 
"{\"queryBlocklist\":[{\"ruleName\":\"block-expensive-scans\",\"dataSources\":[\"large_table\"],\"queryTypes\":[\"scan\"]}],\"queryContext\":{\"priority\":0,\"timeout\":300000}}",
+    "payload": 
"{\"queryBlocklist\":[{\"ruleName\":\"block-expensive-scans\",\"dataSources\":[\"large_table\"],\"queryTypes\":[\"scan\"]}],\"queryContext\":{\"priority\":0,\"timeout\":300000},\"perSegmentTimeoutConfig\":{\"large_table\":{\"perSegmentTimeoutMs\":10000,\"monitorOnly\":false}}}",
     "auditTime": "2024-03-06T12:00:00.000Z"
   }
 ]
diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java 
b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
index def4bb5b831..9a514b85e2d 100644
--- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
+++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
@@ -49,6 +49,7 @@ import org.apache.druid.query.QueryTimeoutException;
 import org.apache.druid.query.QueryToolChest;
 import org.apache.druid.query.context.ResponseContext;
 import org.apache.druid.query.policy.PolicyEnforcer;
+import org.apache.druid.server.broker.PerSegmentTimeoutConfig;
 import org.apache.druid.server.log.RequestLogger;
 import org.apache.druid.server.security.Action;
 import org.apache.druid.server.security.AuthConfig;
@@ -63,6 +64,7 @@ import 
org.checkerframework.checker.nullness.qual.MonotonicNonNull;
 
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -101,6 +103,7 @@ public class QueryLifecycle
   private final AuthConfig authConfig;
   private final PolicyEnforcer policyEnforcer;
   private final List<QueryBlocklistRule> queryBlocklist;
+  private final Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig;
   private final long startMs;
   private final long startNs;
 
@@ -124,6 +127,7 @@ public class QueryLifecycle
       final AuthConfig authConfig,
       final PolicyEnforcer policyEnforcer,
       final List<QueryBlocklistRule> queryBlocklist,
+      final Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig,
       final long startMs,
       final long startNs
   )
@@ -138,6 +142,7 @@ public class QueryLifecycle
     this.authConfig = authConfig;
     this.policyEnforcer = policyEnforcer;
     this.queryBlocklist = queryBlocklist;
+    this.perSegmentTimeoutConfig = perSegmentTimeoutConfig;
     this.startMs = startMs;
     this.startNs = startNs;
   }
@@ -216,15 +221,52 @@ public class QueryLifecycle
       queryId = UUID.randomUUID().toString();
     }
 
-    Map<String, Object> mergedUserAndConfigContext = QueryContexts.override(
-        queryConfigProvider.getContext(),
-        baseQuery.getContext()
-    );
-    mergedUserAndConfigContext.put(BaseQuery.QUERY_ID, queryId);
-    this.baseQuery = 
baseQuery.withOverriddenContext(mergedUserAndConfigContext);
+    // Start with system defaults, apply per-datasource override, then user 
context wins
+    Map<String, Object> contextWithDefaults = new 
HashMap<>(queryConfigProvider.getContext());
+    applyPerDatasourcePerSegmentTimeout(baseQuery, contextWithDefaults, 
queryId);
+    Map<String, Object> finalContext = 
QueryContexts.override(contextWithDefaults, baseQuery.getContext());
+    finalContext.put(BaseQuery.QUERY_ID, queryId);
+
+    this.baseQuery = baseQuery.withOverriddenContext(finalContext);
     this.toolChest = conglomerate.getToolChest(this.baseQuery);
   }
 
+  /**
+   * If a per-datasource per-segment timeout is configured, injects it into 
the context defaults.
+   * User context (applied later via {@link QueryContexts#override}) will 
override this if set explicitly.
+   * In monitorOnly mode, logs the configured timeout but does not inject it.
+   *
+   * For queries involving multiple datasources (e.g., joins or unions), the 
timeout from the first matching datasource is applied
+   * since getTableNames() returns a Set, the match order is non-deterministic.
+   */
+  private void applyPerDatasourcePerSegmentTimeout(
+      final Query<?> query,
+      final Map<String, Object> contextWithDefaults,
+      final String queryId
+  )
+  {
+    if (perSegmentTimeoutConfig.isEmpty()) {
+      return;
+    }
+
+    for (String tableName : query.getDataSource().getTableNames()) {
+      PerSegmentTimeoutConfig dsConfig = 
perSegmentTimeoutConfig.get(tableName);
+      if (dsConfig != null) {
+        if (dsConfig.isMonitorOnly()) {
+          log.debug(
+              "Per-segment timeout [%d ms] configured for datasource [%s] in 
monitorOnly mode (not enforced) for query [%s].",
+              dsConfig.getPerSegmentTimeoutMs(),
+              tableName,
+              queryId
+          );
+        } else {
+          contextWithDefaults.put(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 
dsConfig.getPerSegmentTimeoutMs());
+        }
+        return;
+      }
+    }
+  }
+
   /**
    * Returns {@link AuthorizationResult} based on {@code 
DRUID_AUTHENTICATION_RESULT} in the given request, base query
    * would be transformed with restrictions on the AuthorizationResult.
diff --git 
a/server/src/main/java/org/apache/druid/server/QueryLifecycleFactory.java 
b/server/src/main/java/org/apache/druid/server/QueryLifecycleFactory.java
index b32a5aafffb..57fcd1b47e9 100644
--- a/server/src/main/java/org/apache/druid/server/QueryLifecycleFactory.java
+++ b/server/src/main/java/org/apache/druid/server/QueryLifecycleFactory.java
@@ -28,6 +28,7 @@ import org.apache.druid.query.QueryConfigProvider;
 import org.apache.druid.query.QueryRunnerFactoryConglomerate;
 import org.apache.druid.query.QuerySegmentWalker;
 import org.apache.druid.query.policy.PolicyEnforcer;
+import org.apache.druid.server.broker.PerSegmentTimeoutConfig;
 import org.apache.druid.server.log.RequestLogger;
 import org.apache.druid.server.security.AuthConfig;
 import org.apache.druid.server.security.AuthorizerMapper;
@@ -35,6 +36,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 
 @LazySingleton
 public class QueryLifecycleFactory
@@ -78,10 +80,15 @@ public class QueryLifecycleFactory
 
   public QueryLifecycle factorize()
   {
-    final List<QueryBlocklistRule> queryBlocklist =
-        brokerViewOfBrokerConfig != null && 
brokerViewOfBrokerConfig.getDynamicConfig() != null
-        ? brokerViewOfBrokerConfig.getDynamicConfig().getQueryBlocklist()
-        : Collections.emptyList();
+    final List<QueryBlocklistRule> queryBlocklist;
+    final Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig;
+    if (brokerViewOfBrokerConfig != null && 
brokerViewOfBrokerConfig.getDynamicConfig() != null) {
+      queryBlocklist = 
brokerViewOfBrokerConfig.getDynamicConfig().getQueryBlocklist();
+      perSegmentTimeoutConfig = 
brokerViewOfBrokerConfig.getDynamicConfig().getPerSegmentTimeoutConfig();
+    } else {
+      queryBlocklist = Collections.emptyList();
+      perSegmentTimeoutConfig = Collections.emptyMap();
+    }
 
     return new QueryLifecycle(
         conglomerate,
@@ -94,6 +101,7 @@ public class QueryLifecycleFactory
         authConfig,
         policyEnforcer,
         queryBlocklist,
+        perSegmentTimeoutConfig,
         System.currentTimeMillis(),
         System.nanoTime()
     );
diff --git 
a/server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java 
b/server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java
index 45c646be8a7..57200e729d3 100644
--- 
a/server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java
+++ 
b/server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java
@@ -28,6 +28,7 @@ import org.apache.druid.server.QueryBlocklistRule;
 import javax.annotation.Nullable;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
@@ -52,14 +53,24 @@ public class BrokerDynamicConfig
    */
   private final QueryContext queryContext;
 
+  /**
+   * Per-datasource per-segment timeout configuration. Maps datasource name to 
its
+   * {@link PerSegmentTimeoutConfig}. When a query targets a datasource in 
this map,
+   * the broker injects the configured {@code perSegmentTimeout} into the 
query context
+   * (unless the caller already set it explicitly).
+   */
+  private final Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig;
+
   @JsonCreator
   public BrokerDynamicConfig(
       @JsonProperty("queryBlocklist") @Nullable List<QueryBlocklistRule> 
queryBlocklist,
-      @JsonProperty("queryContext") @Nullable QueryContext queryContext
+      @JsonProperty("queryContext") @Nullable QueryContext queryContext,
+      @JsonProperty("perSegmentTimeoutConfig") @Nullable Map<String, 
PerSegmentTimeoutConfig> perSegmentTimeoutConfig
   )
   {
     this.queryBlocklist = Configs.valueOrDefault(queryBlocklist, 
Collections.emptyList());
     this.queryContext = Configs.valueOrDefault(queryContext, 
QueryContext.empty());
+    this.perSegmentTimeoutConfig = 
Configs.valueOrDefault(perSegmentTimeoutConfig, Collections.emptyMap());
   }
 
   @JsonProperty
@@ -74,6 +85,12 @@ public class BrokerDynamicConfig
     return queryContext;
   }
 
+  @JsonProperty
+  public Map<String, PerSegmentTimeoutConfig> getPerSegmentTimeoutConfig()
+  {
+    return perSegmentTimeoutConfig;
+  }
+
   @Override
   public boolean equals(Object o)
   {
@@ -85,13 +102,14 @@ public class BrokerDynamicConfig
     }
     BrokerDynamicConfig that = (BrokerDynamicConfig) o;
     return Objects.equals(queryBlocklist, that.queryBlocklist)
-           && Objects.equals(queryContext, that.queryContext);
+           && Objects.equals(queryContext, that.queryContext)
+           && Objects.equals(perSegmentTimeoutConfig, 
that.perSegmentTimeoutConfig);
   }
 
   @Override
   public int hashCode()
   {
-    return Objects.hash(queryBlocklist, queryContext);
+    return Objects.hash(queryBlocklist, queryContext, perSegmentTimeoutConfig);
   }
 
   @Override
@@ -100,6 +118,7 @@ public class BrokerDynamicConfig
     return "BrokerDynamicConfig{" +
            "queryBlocklist=" + queryBlocklist +
            ", queryContext=" + queryContext +
+           ", perSegmentTimeoutConfig=" + perSegmentTimeoutConfig +
            '}';
   }
 
@@ -112,6 +131,7 @@ public class BrokerDynamicConfig
   {
     private List<QueryBlocklistRule> queryBlocklist;
     private QueryContext queryContext;
+    private Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig;
 
     public Builder()
     {
@@ -120,11 +140,13 @@ public class BrokerDynamicConfig
     @JsonCreator
     public Builder(
         @JsonProperty("queryBlocklist") @Nullable List<QueryBlocklistRule> 
queryBlocklist,
-        @JsonProperty("queryContext") @Nullable QueryContext queryContext
+        @JsonProperty("queryContext") @Nullable QueryContext queryContext,
+        @JsonProperty("perSegmentTimeoutConfig") @Nullable Map<String, 
PerSegmentTimeoutConfig> perSegmentTimeoutConfig
     )
     {
       this.queryBlocklist = queryBlocklist;
       this.queryContext = queryContext;
+      this.perSegmentTimeoutConfig = perSegmentTimeoutConfig;
     }
 
     public Builder withQueryBlocklist(List<QueryBlocklistRule> queryBlocklist)
@@ -139,9 +161,15 @@ public class BrokerDynamicConfig
       return this;
     }
 
+    public Builder withPerSegmentTimeoutConfig(Map<String, 
PerSegmentTimeoutConfig> perSegmentTimeoutConfig)
+    {
+      this.perSegmentTimeoutConfig = perSegmentTimeoutConfig;
+      return this;
+    }
+
     public BrokerDynamicConfig build()
     {
-      return new BrokerDynamicConfig(queryBlocklist, queryContext);
+      return new BrokerDynamicConfig(queryBlocklist, queryContext, 
perSegmentTimeoutConfig);
     }
 
     /**
@@ -152,7 +180,8 @@ public class BrokerDynamicConfig
     {
       return new BrokerDynamicConfig(
           Configs.valueOrDefault(queryBlocklist, defaults != null ? 
defaults.getQueryBlocklist() : null),
-          Configs.valueOrDefault(queryContext, defaults != null ? 
defaults.getQueryContext() : null)
+          Configs.valueOrDefault(queryContext, defaults != null ? 
defaults.getQueryContext() : null),
+          Configs.valueOrDefault(perSegmentTimeoutConfig, defaults != null ? 
defaults.getPerSegmentTimeoutConfig() : null)
       );
     }
   }
diff --git 
a/server/src/main/java/org/apache/druid/server/broker/PerSegmentTimeoutConfig.java
 
b/server/src/main/java/org/apache/druid/server/broker/PerSegmentTimeoutConfig.java
new file mode 100644
index 00000000000..58461a4996d
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/broker/PerSegmentTimeoutConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.broker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Per-datasource configuration for per-segment timeout on the broker.
+ * Used within {@link BrokerDynamicConfig#getPerSegmentTimeoutConfig()}.
+ */
+public class PerSegmentTimeoutConfig
+{
+  private final long perSegmentTimeoutMs;
+  private final boolean monitorOnly;
+
+  @JsonCreator
+  public PerSegmentTimeoutConfig(
+      @JsonProperty("perSegmentTimeoutMs") long perSegmentTimeoutMs,
+      @JsonProperty("monitorOnly") @Nullable Boolean monitorOnly
+  )
+  {
+    Preconditions.checkArgument(perSegmentTimeoutMs > 0, "perSegmentTimeoutMs 
must be > 0, got [%s]", perSegmentTimeoutMs);
+    this.perSegmentTimeoutMs = perSegmentTimeoutMs;
+    this.monitorOnly = Configs.valueOrDefault(monitorOnly, false);
+  }
+
+  @JsonProperty
+  public long getPerSegmentTimeoutMs()
+  {
+    return perSegmentTimeoutMs;
+  }
+
+  @JsonProperty
+  public boolean isMonitorOnly()
+  {
+    return monitorOnly;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    PerSegmentTimeoutConfig that = (PerSegmentTimeoutConfig) o;
+    return perSegmentTimeoutMs == that.perSegmentTimeoutMs
+           && monitorOnly == that.monitorOnly;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(perSegmentTimeoutMs, monitorOnly);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "PerSegmentTimeoutConfig{" +
+           "perSegmentTimeoutMs=" + perSegmentTimeoutMs +
+           ", monitorOnly=" + monitorOnly +
+           '}';
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/PerSegmentTimeoutInjectionTest.java
 
b/server/src/test/java/org/apache/druid/server/PerSegmentTimeoutInjectionTest.java
new file mode 100644
index 00000000000..440f065ba48
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/PerSegmentTimeoutInjectionTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.GenericQueryMetricsFactory;
+import org.apache.druid.query.QueryConfigProvider;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.server.broker.PerSegmentTimeoutConfig;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class PerSegmentTimeoutInjectionTest
+{
+  private static final String DATASOURCE = "my_datasource";
+
+  private QueryRunnerFactoryConglomerate conglomerate;
+  private QuerySegmentWalker texasRanger;
+  private GenericQueryMetricsFactory metricsFactory;
+  private ServiceEmitter emitter;
+  private RequestLogger requestLogger;
+  private AuthorizerMapper authzMapper;
+  private QueryConfigProvider queryConfig;
+  private QueryToolChest toolChest;
+
+  private final TimeseriesQuery baseQuery = Druids.newTimeseriesQueryBuilder()
+      .dataSource(DATASOURCE)
+      .intervals(List.of(Intervals.ETERNITY))
+      .aggregators(new CountAggregatorFactory("count"))
+      .build();
+
+  @Before
+  public void setUp()
+  {
+    conglomerate = EasyMock.createMock(QueryRunnerFactoryConglomerate.class);
+    texasRanger = EasyMock.createMock(QuerySegmentWalker.class);
+    metricsFactory = EasyMock.createMock(GenericQueryMetricsFactory.class);
+    emitter = EasyMock.createMock(ServiceEmitter.class);
+    requestLogger = EasyMock.createNiceMock(RequestLogger.class);
+    authzMapper = EasyMock.createNiceMock(AuthorizerMapper.class);
+    queryConfig = EasyMock.createMock(QueryConfigProvider.class);
+    toolChest = EasyMock.createNiceMock(QueryToolChest.class);
+  }
+
+  @After
+  public void tearDown()
+  {
+    EasyMock.verify(conglomerate, queryConfig);
+  }
+
+  @Test
+  public void testPerDatasourceTimeout_applied()
+  {
+    Map<String, PerSegmentTimeoutConfig> config = Map.of(
+        DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
+    );
+
+    expectDefaults();
+
+    QueryLifecycle lifecycle = createLifecycle(config);
+    lifecycle.initialize(baseQuery);
+
+    Assert.assertEquals(5000L, 
lifecycle.getQuery().context().getPerSegmentTimeout());
+  }
+
+  @Test
+  public void testPerDatasourceTimeout_userOverrideWins()
+  {
+    Map<String, PerSegmentTimeoutConfig> config = Map.of(
+        DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
+    );
+
+    expectDefaults();
+
+    TimeseriesQuery queryWithUserTimeout = baseQuery.withOverriddenContext(
+        Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 2000L)
+    );
+
+    QueryLifecycle lifecycle = createLifecycle(config);
+    lifecycle.initialize(queryWithUserTimeout);
+
+    Assert.assertEquals(2000L, 
lifecycle.getQuery().context().getPerSegmentTimeout());
+  }
+
+  @Test
+  public void testPerDatasourceTimeout_monitorOnlyDoesNotInject()
+  {
+    // monitorOnly=true: config exists but should NOT be enforced
+    Map<String, PerSegmentTimeoutConfig> config = Map.of(
+        DATASOURCE, new PerSegmentTimeoutConfig(5000, true)
+    );
+
+    expectDefaults();
+
+    QueryLifecycle lifecycle = createLifecycle(config);
+    lifecycle.initialize(baseQuery);
+
+    Assert.assertFalse(
+        "monitorOnly should not inject perSegmentTimeout",
+        lifecycle.getQuery().context().usePerSegmentTimeout()
+    );
+  }
+
+  @Test
+  public void testPerDatasourceTimeout_noMatchingDatasource()
+  {
+    Map<String, PerSegmentTimeoutConfig> config = Map.of(
+        "other_datasource", new PerSegmentTimeoutConfig(5000, false)
+    );
+
+    expectDefaults();
+
+    QueryLifecycle lifecycle = createLifecycle(config);
+    lifecycle.initialize(baseQuery);
+
+    Assert.assertFalse(lifecycle.getQuery().context().usePerSegmentTimeout());
+  }
+
+  @Test
+  public void testPerDatasourceTimeout_overridesSystemDefault()
+  {
+    Map<String, PerSegmentTimeoutConfig> config = Map.of(
+        DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
+    );
+
+    // System default sets perSegmentTimeout to 10000
+    EasyMock.expect(queryConfig.getContext())
+            .andReturn(Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 10000L))
+            .anyTimes();
+    
EasyMock.expect(conglomerate.getToolChest(EasyMock.anyObject())).andReturn(toolChest).once();
+    EasyMock.replay(conglomerate, queryConfig);
+
+    QueryLifecycle lifecycle = createLifecycle(config);
+    lifecycle.initialize(baseQuery);
+
+    Assert.assertEquals(5000L, 
lifecycle.getQuery().context().getPerSegmentTimeout());
+  }
+
+  @Test
+  public void testPrecedence_userOverridesPerDatasourceOverridesSystemDefault()
+  {
+    // System default: 10000, per-datasource: 5000, user: 2000 — user should 
win
+    Map<String, PerSegmentTimeoutConfig> config = Map.of(
+        DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
+    );
+
+    EasyMock.expect(queryConfig.getContext())
+            .andReturn(Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 10000L))
+            .anyTimes();
+    
EasyMock.expect(conglomerate.getToolChest(EasyMock.anyObject())).andReturn(toolChest).once();
+    EasyMock.replay(conglomerate, queryConfig);
+
+    TimeseriesQuery queryWithUserTimeout = baseQuery.withOverriddenContext(
+        Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 2000L)
+    );
+
+    QueryLifecycle lifecycle = createLifecycle(config);
+    lifecycle.initialize(queryWithUserTimeout);
+
+    Assert.assertEquals(2000L, 
lifecycle.getQuery().context().getPerSegmentTimeout());
+  }
+
+  private void expectDefaults()
+  {
+    EasyMock.expect(queryConfig.getContext()).andReturn(Map.of()).anyTimes();
+    
EasyMock.expect(conglomerate.getToolChest(EasyMock.anyObject())).andReturn(toolChest).once();
+    EasyMock.replay(conglomerate, queryConfig);
+  }
+
+  private QueryLifecycle createLifecycle(Map<String, PerSegmentTimeoutConfig> 
perSegmentTimeoutConfig)
+  {
+    return new QueryLifecycle(
+        conglomerate,
+        texasRanger,
+        metricsFactory,
+        emitter,
+        requestLogger,
+        authzMapper,
+        queryConfig,
+        new AuthConfig(),
+        NoopPolicyEnforcer.instance(),
+        Collections.emptyList(),
+        perSegmentTimeoutConfig,
+        System.currentTimeMillis(),
+        System.nanoTime()
+    );
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java 
b/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
index b478ab9befc..96880147f40 100644
--- a/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
@@ -80,6 +80,7 @@ import org.junit.rules.ExpectedException;
 
 import javax.annotation.Nullable;
 import javax.servlet.http.HttpServletRequest;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -859,6 +860,7 @@ public class QueryLifecycleTest
         authConfig,
         policyEnforcer,
         queryBlocklist,
+        Collections.emptyMap(),
         System.currentTimeMillis(),
         System.nanoTime()
     );
@@ -910,6 +912,7 @@ public class QueryLifecycleTest
         authConfig,
         policyEnforcer,
         queryBlocklist,
+        Collections.emptyMap(),
         System.currentTimeMillis(),
         System.nanoTime()
     );
diff --git 
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java 
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index bf16044943e..d7a611bfc32 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -776,6 +776,7 @@ public class QueryResourceTest
                 new AuthConfig(),
                 NoopPolicyEnforcer.instance(),
                 null,
+                Collections.emptyMap(),
                 System.currentTimeMillis(),
                 System.nanoTime()
             )
diff --git 
a/server/src/test/java/org/apache/druid/server/broker/BrokerDynamicConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/broker/BrokerDynamicConfigTest.java
index 06e4df77c0a..425ccbdd14e 100644
--- 
a/server/src/test/java/org/apache/druid/server/broker/BrokerDynamicConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/broker/BrokerDynamicConfigTest.java
@@ -31,6 +31,7 @@ import org.junit.Assert;
 import org.junit.Test;
 
 import java.util.List;
+import java.util.Map;
 
 public class BrokerDynamicConfigTest
 {
@@ -142,6 +143,36 @@ public class BrokerDynamicConfigTest
     Assert.assertTrue(actual.getQueryContext().isEmpty());
   }
 
+  @Test
+  public void testSerdeWithPerSegmentTimeoutConfig() throws Exception
+  {
+    String jsonStr = "{\n"
+                     + "  \"perSegmentTimeoutConfig\": {\n"
+                     + "    \"my_large_ds\": {\"perSegmentTimeoutMs\": 5000, 
\"monitorOnly\": true},\n"
+                     + "    \"my_other_ds\": {\"perSegmentTimeoutMs\": 3000}\n"
+                     + "  }\n"
+                     + "}\n";
+
+    BrokerDynamicConfig actual = mapper.readValue(
+        mapper.writeValueAsString(mapper.readValue(jsonStr, 
BrokerDynamicConfig.class)),
+        BrokerDynamicConfig.class
+    );
+
+    Map<String, PerSegmentTimeoutConfig> expected = ImmutableMap.of(
+        "my_large_ds", new PerSegmentTimeoutConfig(5000, true),
+        "my_other_ds", new PerSegmentTimeoutConfig(3000, null)
+    );
+    Assert.assertEquals(expected, actual.getPerSegmentTimeoutConfig());
+  }
+
+  @Test
+  public void testNullPerSegmentTimeoutConfigDefaultsToEmptyMap() throws 
Exception
+  {
+    BrokerDynamicConfig actual = mapper.readValue("{}", 
BrokerDynamicConfig.class);
+    Assert.assertNotNull(actual.getPerSegmentTimeoutConfig());
+    Assert.assertTrue(actual.getPerSegmentTimeoutConfig().isEmpty());
+  }
+
   @Test
   public void testEquals()
   {
diff --git 
a/server/src/test/java/org/apache/druid/server/broker/PerSegmentTimeoutConfigTest.java
 
b/server/src/test/java/org/apache/druid/server/broker/PerSegmentTimeoutConfigTest.java
new file mode 100644
index 00000000000..80ceddf7e5e
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/broker/PerSegmentTimeoutConfigTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.broker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PerSegmentTimeoutConfigTest
+{
+  private final ObjectMapper mapper = TestHelper.makeJsonMapper();
+
+  @Test
+  public void testSerde() throws Exception
+  {
+    String json = "{\"perSegmentTimeoutMs\": 5000, \"monitorOnly\": true}";
+    PerSegmentTimeoutConfig config = mapper.readValue(json, 
PerSegmentTimeoutConfig.class);
+    Assert.assertEquals(5000L, config.getPerSegmentTimeoutMs());
+    Assert.assertTrue(config.isMonitorOnly());
+
+    // Round-trip
+    PerSegmentTimeoutConfig roundTripped = mapper.readValue(
+        mapper.writeValueAsString(config),
+        PerSegmentTimeoutConfig.class
+    );
+    Assert.assertEquals(config, roundTripped);
+  }
+
+  @Test
+  public void testMonitorOnlyDefaultsToFalse() throws Exception
+  {
+    String json = "{\"perSegmentTimeoutMs\": 3000}";
+    PerSegmentTimeoutConfig config = mapper.readValue(json, 
PerSegmentTimeoutConfig.class);
+    Assert.assertEquals(3000L, config.getPerSegmentTimeoutMs());
+    Assert.assertFalse(config.isMonitorOnly());
+  }
+
+  @Test
+  public void testRejectsZeroTimeout()
+  {
+    Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> new PerSegmentTimeoutConfig(0, null)
+    );
+  }
+
+  @Test
+  public void testRejectsNegativeTimeout()
+  {
+    Assert.assertThrows(
+        IllegalArgumentException.class,
+        () -> new PerSegmentTimeoutConfig(-1, null)
+    );
+  }
+
+  @Test
+  public void testEquals()
+  {
+    EqualsVerifier.forClass(PerSegmentTimeoutConfig.class)
+                  .usingGetClass()
+                  .verify();
+  }
+}
diff --git 
a/web-console/src/dialogs/broker-dynamic-config-dialog/broker-dynamic-config-completions.ts
 
b/web-console/src/dialogs/broker-dynamic-config-dialog/broker-dynamic-config-completions.ts
index 6620dc721f0..f38664319d6 100644
--- 
a/web-console/src/dialogs/broker-dynamic-config-dialog/broker-dynamic-config-completions.ts
+++ 
b/web-console/src/dialogs/broker-dynamic-config-dialog/broker-dynamic-config-completions.ts
@@ -34,6 +34,11 @@ export const BROKER_DYNAMIC_CONFIG_COMPLETIONS: 
JsonCompletionRule[] = [
         documentation:
           'Default query context values applied to all queries on this broker. 
These override static defaults from runtime properties but are overridden by 
per-query context values. Useful for setting cluster-wide defaults like 
priority or timeout without restarting.',
       },
+      {
+        value: 'perSegmentTimeoutConfig',
+        documentation:
+          'Per-datasource per-segment timeout configuration. Maps datasource 
name to {perSegmentTimeoutMs, monitorOnly}. The broker injects the timeout into 
query context unless the caller set it explicitly.',
+      },
     ],
   },
   // Query context key suggestions
@@ -99,6 +104,21 @@ export const BROKER_DYNAMIC_CONFIG_COMPLETIONS: 
JsonCompletionRule[] = [
     path: '$.queryBlocklist.[*].dataSources.[]',
     completions: [{ value: 'example_datasource', documentation: 'Datasource 
name to block' }],
   },
+  // Per-segment timeout config per datasource
+  {
+    path: '$.perSegmentTimeoutConfig.*',
+    isObject: true,
+    completions: [
+      {
+        value: 'perSegmentTimeoutMs',
+        documentation: 'Per-segment timeout in milliseconds (required, must be 
> 0)',
+      },
+      {
+        value: 'monitorOnly',
+        documentation: 'If true, log the configured timeout without enforcing 
it (default: false)',
+      },
+    ],
+  },
   // Context matches example
   {
     path: '$.queryBlocklist.[*].contextMatches',
diff --git 
a/web-console/src/druid-models/broker-dynamic-config/broker-dynamic-config.tsx 
b/web-console/src/druid-models/broker-dynamic-config/broker-dynamic-config.tsx
index 9fc66cd4fa1..40fb05ddcc4 100644
--- 
a/web-console/src/druid-models/broker-dynamic-config/broker-dynamic-config.tsx
+++ 
b/web-console/src/druid-models/broker-dynamic-config/broker-dynamic-config.tsx
@@ -18,9 +18,15 @@
 
 import type { Field } from '../../components';
 
+export interface PerSegmentTimeoutConfig {
+  perSegmentTimeoutMs: number;
+  monitorOnly?: boolean;
+}
+
 export interface BrokerDynamicConfig {
   queryBlocklist?: QueryBlocklistRule[];
   queryContext?: Record<string, unknown>;
+  perSegmentTimeoutConfig?: Record<string, PerSegmentTimeoutConfig>;
 }
 
 export interface QueryBlocklistRule {
@@ -51,4 +57,16 @@ export const BROKER_DYNAMIC_CONFIG_FIELDS: 
Field<BrokerDynamicConfig>[] = [
       </>
     ),
   },
+  {
+    name: 'perSegmentTimeoutConfig',
+    type: 'json',
+    info: (
+      <>
+        Per-datasource per-segment timeout configuration. Maps datasource name 
to timeout settings.
+        When a query targets a configured datasource, the broker injects the 
per-segment timeout
+        into the query context (unless the caller already set it explicitly). 
Set{' '}
+        <code>monitorOnly: true</code> to log without enforcing.
+      </>
+    ),
+  },
 ];


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to