This is an automated email from the ASF dual-hosted git repository.
yqm pushed a commit to branch 37.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/37.0.0 by this push:
new b3bd2450e89 feat: Segment Timout Per Datasource (#19221) (#19291)
b3bd2450e89 is described below
commit b3bd2450e8952732c6e463b07d325326b8245805
Author: Cece Mei <[email protected]>
AuthorDate: Fri Apr 10 17:07:17 2026 -0700
feat: Segment Timout Per Datasource (#19221) (#19291)
Adds the ability to configure perSegmentTimeout per datasource via
BrokerDynamicConfig, so different per-segment timeout thresholds for can be set
for different datasources e.g. larger datasources may need different thresholds
than small ones.
Co-authored-by: mshahid6 <[email protected]>
---
docs/api-reference/dynamic-configuration-api.md | 72 ++++++-
.../org/apache/druid/server/QueryLifecycle.java | 54 ++++-
.../apache/druid/server/QueryLifecycleFactory.java | 16 +-
.../druid/server/broker/BrokerDynamicConfig.java | 41 +++-
.../server/broker/PerSegmentTimeoutConfig.java | 90 +++++++++
.../server/PerSegmentTimeoutInjectionTest.java | 223 +++++++++++++++++++++
.../apache/druid/server/QueryLifecycleTest.java | 3 +
.../org/apache/druid/server/QueryResourceTest.java | 1 +
.../server/broker/BrokerDynamicConfigTest.java | 31 +++
.../server/broker/PerSegmentTimeoutConfigTest.java | 82 ++++++++
.../broker-dynamic-config-completions.ts | 20 ++
.../broker-dynamic-config.tsx | 18 ++
12 files changed, 634 insertions(+), 17 deletions(-)
diff --git a/docs/api-reference/dynamic-configuration-api.md
b/docs/api-reference/dynamic-configuration-api.md
index 80433a54103..95166aa3c54 100644
--- a/docs/api-reference/dynamic-configuration-api.md
+++ b/docs/api-reference/dynamic-configuration-api.md
@@ -376,6 +376,12 @@ Host: http://ROUTER_IP:ROUTER_PORT
"queryContext": {
"priority": 0,
"timeout": 300000
+ },
+ "perSegmentTimeoutConfig": {
+ "large_table": {
+ "perSegmentTimeoutMs": 10000,
+ "monitorOnly": false
+ }
}
}
```
@@ -445,6 +451,15 @@ curl -X POST
"http://ROUTER_IP:ROUTER_PORT/druid/coordinator/v1/broker/config" \
"queryContext": {
"priority": 0,
"timeout": 300000
+ },
+ "perSegmentTimeoutConfig": {
+ "large_table": {
+ "perSegmentTimeoutMs": 10000
+ },
+ "huge_dataset": {
+ "perSegmentTimeoutMs": 5000,
+ "monitorOnly": true
+ }
}
}'
```
@@ -477,6 +492,15 @@ X-Druid-Comment: Add query blocklist rules and set default
context
"queryContext": {
"priority": 0,
"timeout": 300000
+ },
+ "perSegmentTimeoutConfig": {
+ "large_table": {
+ "perSegmentTimeoutMs": 10000
+ },
+ "huge_dataset": {
+ "perSegmentTimeoutMs": 5000,
+ "monitorOnly": true
+ }
}
}
```
@@ -496,6 +520,7 @@ The following table shows the dynamic configuration
properties for the Broker.
|--------|-----------|-------|
|`queryBlocklist`| List of rules to block queries based on datasource, query
type, and/or query context parameters. Each rule defines criteria that are
combined with AND logic. Blocked queries return an HTTP 403 error. See [Query
blocklist rules](#query-blocklist-rules) for details.|none|
|`queryContext`| Map of default query context key-value pairs applied to all
queries on this broker. These values override static defaults set via runtime
properties (`druid.query.default.context.*`) but are overridden by context
values supplied in individual query payloads. Useful for setting cluster-wide
defaults such as `priority` or `timeout` without restarting. See [Query context
reference](../querying/query-context-reference.md) for available keys.|none|
+|`perSegmentTimeoutConfig`| Map of datasource names to per-segment timeout
configurations. When a query targets a datasource in this map, the Broker
injects the configured `perSegmentTimeout` into the query context before
forwarding to Historicals. See [Per-segment timeout
configuration](#per-segment-timeout-configuration) for details.|none|
#### Query blocklist rules
@@ -531,6 +556,51 @@ When a query is blocked, the Broker returns an HTTP 403
error with a message ind
}
```
+#### Per-segment timeout configuration
+
+Per-segment timeout configuration allows operators to set per-datasource
segment processing timeouts without restarting the cluster. This is useful when
different datasources have different performance characteristics — for example,
allowing longer timeouts for larger datasets.
+
+Each entry in the `perSegmentTimeoutConfig` map is keyed by datasource name
and has the following properties:
+
+|Property|Description|Required|Default|
+|--------|-----------|--------|-------|
+|`perSegmentTimeoutMs`|Per-segment processing timeout in milliseconds. Must be
greater than 0.|Yes|N/A|
+|`monitorOnly`|When `true`, the timeout value is logged but not enforced.
Useful for observing the impact of a timeout before enabling
enforcement.|No|`false`|
+
+**Precedence order** (highest to lowest):
+
+1. User-supplied `perSegmentTimeout` in the query context
+2. Per-datasource value from `perSegmentTimeoutConfig`
+3. Cluster-wide default from `druid.query.default.context.perSegmentTimeout`
+
+> **Note:** For queries involving multiple datasources (e.g. joins or unions),
the timeout from the
+> first matching datasource is applied. The match order is non-deterministic.
+> To avoid this, configure the same timeout for all datasources involved in
such queries, or set `perSegmentTimeout` explicitly in the query context.
+
+**Example configuration:**
+
+```json
+{
+ "perSegmentTimeoutConfig": {
+ "large_datasource": {
+ "perSegmentTimeoutMs": 10000
+ },
+ "critical_datasource": {
+ "perSegmentTimeoutMs": 5000,
+ "monitorOnly": true
+ }
+ }
+}
+```
+
+To clear all per-datasource timeouts, POST an empty map:
+
+```json
+{
+ "perSegmentTimeoutConfig": {}
+}
+```
+
### Get broker dynamic configuration history
Retrieves the history of changes to Broker dynamic configuration over an
interval of time. Returns an empty array if there are no history records
available.
@@ -603,7 +673,7 @@ Host: http://ROUTER_IP:ROUTER_PORT
"comment": "Add query blocklist rules",
"ip": "127.0.0.1"
},
- "payload":
"{\"queryBlocklist\":[{\"ruleName\":\"block-expensive-scans\",\"dataSources\":[\"large_table\"],\"queryTypes\":[\"scan\"]}],\"queryContext\":{\"priority\":0,\"timeout\":300000}}",
+ "payload":
"{\"queryBlocklist\":[{\"ruleName\":\"block-expensive-scans\",\"dataSources\":[\"large_table\"],\"queryTypes\":[\"scan\"]}],\"queryContext\":{\"priority\":0,\"timeout\":300000},\"perSegmentTimeoutConfig\":{\"large_table\":{\"perSegmentTimeoutMs\":10000,\"monitorOnly\":false}}}",
"auditTime": "2024-03-06T12:00:00.000Z"
}
]
diff --git a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
index def4bb5b831..9a514b85e2d 100644
--- a/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
+++ b/server/src/main/java/org/apache/druid/server/QueryLifecycle.java
@@ -49,6 +49,7 @@ import org.apache.druid.query.QueryTimeoutException;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.query.context.ResponseContext;
import org.apache.druid.query.policy.PolicyEnforcer;
+import org.apache.druid.server.broker.PerSegmentTimeoutConfig;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthConfig;
@@ -63,6 +64,7 @@ import
org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
@@ -101,6 +103,7 @@ public class QueryLifecycle
private final AuthConfig authConfig;
private final PolicyEnforcer policyEnforcer;
private final List<QueryBlocklistRule> queryBlocklist;
+ private final Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig;
private final long startMs;
private final long startNs;
@@ -124,6 +127,7 @@ public class QueryLifecycle
final AuthConfig authConfig,
final PolicyEnforcer policyEnforcer,
final List<QueryBlocklistRule> queryBlocklist,
+ final Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig,
final long startMs,
final long startNs
)
@@ -138,6 +142,7 @@ public class QueryLifecycle
this.authConfig = authConfig;
this.policyEnforcer = policyEnforcer;
this.queryBlocklist = queryBlocklist;
+ this.perSegmentTimeoutConfig = perSegmentTimeoutConfig;
this.startMs = startMs;
this.startNs = startNs;
}
@@ -216,15 +221,52 @@ public class QueryLifecycle
queryId = UUID.randomUUID().toString();
}
- Map<String, Object> mergedUserAndConfigContext = QueryContexts.override(
- queryConfigProvider.getContext(),
- baseQuery.getContext()
- );
- mergedUserAndConfigContext.put(BaseQuery.QUERY_ID, queryId);
- this.baseQuery =
baseQuery.withOverriddenContext(mergedUserAndConfigContext);
+ // Start with system defaults, apply per-datasource override, then user
context wins
+ Map<String, Object> contextWithDefaults = new
HashMap<>(queryConfigProvider.getContext());
+ applyPerDatasourcePerSegmentTimeout(baseQuery, contextWithDefaults,
queryId);
+ Map<String, Object> finalContext =
QueryContexts.override(contextWithDefaults, baseQuery.getContext());
+ finalContext.put(BaseQuery.QUERY_ID, queryId);
+
+ this.baseQuery = baseQuery.withOverriddenContext(finalContext);
this.toolChest = conglomerate.getToolChest(this.baseQuery);
}
+ /**
+ * If a per-datasource per-segment timeout is configured, injects it into
the context defaults.
+ * User context (applied later via {@link QueryContexts#override}) will
override this if set explicitly.
+ * In monitorOnly mode, logs the configured timeout but does not inject it.
+ *
+ * For queries involving multiple datasources (e.g., joins or unions), the
timeout from the first matching datasource is applied
+ * since getTableNames() returns a Set, the match order is non-deterministic.
+ */
+ private void applyPerDatasourcePerSegmentTimeout(
+ final Query<?> query,
+ final Map<String, Object> contextWithDefaults,
+ final String queryId
+ )
+ {
+ if (perSegmentTimeoutConfig.isEmpty()) {
+ return;
+ }
+
+ for (String tableName : query.getDataSource().getTableNames()) {
+ PerSegmentTimeoutConfig dsConfig =
perSegmentTimeoutConfig.get(tableName);
+ if (dsConfig != null) {
+ if (dsConfig.isMonitorOnly()) {
+ log.debug(
+ "Per-segment timeout [%d ms] configured for datasource [%s] in
monitorOnly mode (not enforced) for query [%s].",
+ dsConfig.getPerSegmentTimeoutMs(),
+ tableName,
+ queryId
+ );
+ } else {
+ contextWithDefaults.put(QueryContexts.PER_SEGMENT_TIMEOUT_KEY,
dsConfig.getPerSegmentTimeoutMs());
+ }
+ return;
+ }
+ }
+ }
+
/**
* Returns {@link AuthorizationResult} based on {@code
DRUID_AUTHENTICATION_RESULT} in the given request, base query
* would be transformed with restrictions on the AuthorizationResult.
diff --git
a/server/src/main/java/org/apache/druid/server/QueryLifecycleFactory.java
b/server/src/main/java/org/apache/druid/server/QueryLifecycleFactory.java
index b32a5aafffb..57fcd1b47e9 100644
--- a/server/src/main/java/org/apache/druid/server/QueryLifecycleFactory.java
+++ b/server/src/main/java/org/apache/druid/server/QueryLifecycleFactory.java
@@ -28,6 +28,7 @@ import org.apache.druid.query.QueryConfigProvider;
import org.apache.druid.query.QueryRunnerFactoryConglomerate;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.policy.PolicyEnforcer;
+import org.apache.druid.server.broker.PerSegmentTimeoutConfig;
import org.apache.druid.server.log.RequestLogger;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthorizerMapper;
@@ -35,6 +36,7 @@ import org.apache.druid.server.security.AuthorizerMapper;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
@LazySingleton
public class QueryLifecycleFactory
@@ -78,10 +80,15 @@ public class QueryLifecycleFactory
public QueryLifecycle factorize()
{
- final List<QueryBlocklistRule> queryBlocklist =
- brokerViewOfBrokerConfig != null &&
brokerViewOfBrokerConfig.getDynamicConfig() != null
- ? brokerViewOfBrokerConfig.getDynamicConfig().getQueryBlocklist()
- : Collections.emptyList();
+ final List<QueryBlocklistRule> queryBlocklist;
+ final Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig;
+ if (brokerViewOfBrokerConfig != null &&
brokerViewOfBrokerConfig.getDynamicConfig() != null) {
+ queryBlocklist =
brokerViewOfBrokerConfig.getDynamicConfig().getQueryBlocklist();
+ perSegmentTimeoutConfig =
brokerViewOfBrokerConfig.getDynamicConfig().getPerSegmentTimeoutConfig();
+ } else {
+ queryBlocklist = Collections.emptyList();
+ perSegmentTimeoutConfig = Collections.emptyMap();
+ }
return new QueryLifecycle(
conglomerate,
@@ -94,6 +101,7 @@ public class QueryLifecycleFactory
authConfig,
policyEnforcer,
queryBlocklist,
+ perSegmentTimeoutConfig,
System.currentTimeMillis(),
System.nanoTime()
);
diff --git
a/server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java
b/server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java
index 45c646be8a7..57200e729d3 100644
---
a/server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java
+++
b/server/src/main/java/org/apache/druid/server/broker/BrokerDynamicConfig.java
@@ -28,6 +28,7 @@ import org.apache.druid.server.QueryBlocklistRule;
import javax.annotation.Nullable;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
/**
@@ -52,14 +53,24 @@ public class BrokerDynamicConfig
*/
private final QueryContext queryContext;
+ /**
+ * Per-datasource per-segment timeout configuration. Maps datasource name to
its
+ * {@link PerSegmentTimeoutConfig}. When a query targets a datasource in
this map,
+ * the broker injects the configured {@code perSegmentTimeout} into the
query context
+ * (unless the caller already set it explicitly).
+ */
+ private final Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig;
+
@JsonCreator
public BrokerDynamicConfig(
@JsonProperty("queryBlocklist") @Nullable List<QueryBlocklistRule>
queryBlocklist,
- @JsonProperty("queryContext") @Nullable QueryContext queryContext
+ @JsonProperty("queryContext") @Nullable QueryContext queryContext,
+ @JsonProperty("perSegmentTimeoutConfig") @Nullable Map<String,
PerSegmentTimeoutConfig> perSegmentTimeoutConfig
)
{
this.queryBlocklist = Configs.valueOrDefault(queryBlocklist,
Collections.emptyList());
this.queryContext = Configs.valueOrDefault(queryContext,
QueryContext.empty());
+ this.perSegmentTimeoutConfig =
Configs.valueOrDefault(perSegmentTimeoutConfig, Collections.emptyMap());
}
@JsonProperty
@@ -74,6 +85,12 @@ public class BrokerDynamicConfig
return queryContext;
}
+ @JsonProperty
+ public Map<String, PerSegmentTimeoutConfig> getPerSegmentTimeoutConfig()
+ {
+ return perSegmentTimeoutConfig;
+ }
+
@Override
public boolean equals(Object o)
{
@@ -85,13 +102,14 @@ public class BrokerDynamicConfig
}
BrokerDynamicConfig that = (BrokerDynamicConfig) o;
return Objects.equals(queryBlocklist, that.queryBlocklist)
- && Objects.equals(queryContext, that.queryContext);
+ && Objects.equals(queryContext, that.queryContext)
+ && Objects.equals(perSegmentTimeoutConfig,
that.perSegmentTimeoutConfig);
}
@Override
public int hashCode()
{
- return Objects.hash(queryBlocklist, queryContext);
+ return Objects.hash(queryBlocklist, queryContext, perSegmentTimeoutConfig);
}
@Override
@@ -100,6 +118,7 @@ public class BrokerDynamicConfig
return "BrokerDynamicConfig{" +
"queryBlocklist=" + queryBlocklist +
", queryContext=" + queryContext +
+ ", perSegmentTimeoutConfig=" + perSegmentTimeoutConfig +
'}';
}
@@ -112,6 +131,7 @@ public class BrokerDynamicConfig
{
private List<QueryBlocklistRule> queryBlocklist;
private QueryContext queryContext;
+ private Map<String, PerSegmentTimeoutConfig> perSegmentTimeoutConfig;
public Builder()
{
@@ -120,11 +140,13 @@ public class BrokerDynamicConfig
@JsonCreator
public Builder(
@JsonProperty("queryBlocklist") @Nullable List<QueryBlocklistRule>
queryBlocklist,
- @JsonProperty("queryContext") @Nullable QueryContext queryContext
+ @JsonProperty("queryContext") @Nullable QueryContext queryContext,
+ @JsonProperty("perSegmentTimeoutConfig") @Nullable Map<String,
PerSegmentTimeoutConfig> perSegmentTimeoutConfig
)
{
this.queryBlocklist = queryBlocklist;
this.queryContext = queryContext;
+ this.perSegmentTimeoutConfig = perSegmentTimeoutConfig;
}
public Builder withQueryBlocklist(List<QueryBlocklistRule> queryBlocklist)
@@ -139,9 +161,15 @@ public class BrokerDynamicConfig
return this;
}
+ public Builder withPerSegmentTimeoutConfig(Map<String,
PerSegmentTimeoutConfig> perSegmentTimeoutConfig)
+ {
+ this.perSegmentTimeoutConfig = perSegmentTimeoutConfig;
+ return this;
+ }
+
public BrokerDynamicConfig build()
{
- return new BrokerDynamicConfig(queryBlocklist, queryContext);
+ return new BrokerDynamicConfig(queryBlocklist, queryContext,
perSegmentTimeoutConfig);
}
/**
@@ -152,7 +180,8 @@ public class BrokerDynamicConfig
{
return new BrokerDynamicConfig(
Configs.valueOrDefault(queryBlocklist, defaults != null ?
defaults.getQueryBlocklist() : null),
- Configs.valueOrDefault(queryContext, defaults != null ?
defaults.getQueryContext() : null)
+ Configs.valueOrDefault(queryContext, defaults != null ?
defaults.getQueryContext() : null),
+ Configs.valueOrDefault(perSegmentTimeoutConfig, defaults != null ?
defaults.getPerSegmentTimeoutConfig() : null)
);
}
}
diff --git
a/server/src/main/java/org/apache/druid/server/broker/PerSegmentTimeoutConfig.java
b/server/src/main/java/org/apache/druid/server/broker/PerSegmentTimeoutConfig.java
new file mode 100644
index 00000000000..58461a4996d
--- /dev/null
+++
b/server/src/main/java/org/apache/druid/server/broker/PerSegmentTimeoutConfig.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.broker;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.base.Preconditions;
+import org.apache.druid.common.config.Configs;
+
+import javax.annotation.Nullable;
+import java.util.Objects;
+
+/**
+ * Per-datasource configuration for per-segment timeout on the broker.
+ * Used within {@link BrokerDynamicConfig#getPerSegmentTimeoutConfig()}.
+ */
+public class PerSegmentTimeoutConfig
+{
+ private final long perSegmentTimeoutMs;
+ private final boolean monitorOnly;
+
+ @JsonCreator
+ public PerSegmentTimeoutConfig(
+ @JsonProperty("perSegmentTimeoutMs") long perSegmentTimeoutMs,
+ @JsonProperty("monitorOnly") @Nullable Boolean monitorOnly
+ )
+ {
+ Preconditions.checkArgument(perSegmentTimeoutMs > 0, "perSegmentTimeoutMs
must be > 0, got [%s]", perSegmentTimeoutMs);
+ this.perSegmentTimeoutMs = perSegmentTimeoutMs;
+ this.monitorOnly = Configs.valueOrDefault(monitorOnly, false);
+ }
+
+ @JsonProperty
+ public long getPerSegmentTimeoutMs()
+ {
+ return perSegmentTimeoutMs;
+ }
+
+ @JsonProperty
+ public boolean isMonitorOnly()
+ {
+ return monitorOnly;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ PerSegmentTimeoutConfig that = (PerSegmentTimeoutConfig) o;
+ return perSegmentTimeoutMs == that.perSegmentTimeoutMs
+ && monitorOnly == that.monitorOnly;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(perSegmentTimeoutMs, monitorOnly);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "PerSegmentTimeoutConfig{" +
+ "perSegmentTimeoutMs=" + perSegmentTimeoutMs +
+ ", monitorOnly=" + monitorOnly +
+ '}';
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/PerSegmentTimeoutInjectionTest.java
b/server/src/test/java/org/apache/druid/server/PerSegmentTimeoutInjectionTest.java
new file mode 100644
index 00000000000..440f065ba48
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/PerSegmentTimeoutInjectionTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server;
+
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.service.ServiceEmitter;
+import org.apache.druid.query.Druids;
+import org.apache.druid.query.GenericQueryMetricsFactory;
+import org.apache.druid.query.QueryConfigProvider;
+import org.apache.druid.query.QueryContexts;
+import org.apache.druid.query.QueryRunnerFactoryConglomerate;
+import org.apache.druid.query.QuerySegmentWalker;
+import org.apache.druid.query.QueryToolChest;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.policy.NoopPolicyEnforcer;
+import org.apache.druid.query.timeseries.TimeseriesQuery;
+import org.apache.druid.server.broker.PerSegmentTimeoutConfig;
+import org.apache.druid.server.log.RequestLogger;
+import org.apache.druid.server.security.AuthConfig;
+import org.apache.druid.server.security.AuthorizerMapper;
+import org.easymock.EasyMock;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class PerSegmentTimeoutInjectionTest
+{
+ private static final String DATASOURCE = "my_datasource";
+
+ private QueryRunnerFactoryConglomerate conglomerate;
+ private QuerySegmentWalker texasRanger;
+ private GenericQueryMetricsFactory metricsFactory;
+ private ServiceEmitter emitter;
+ private RequestLogger requestLogger;
+ private AuthorizerMapper authzMapper;
+ private QueryConfigProvider queryConfig;
+ private QueryToolChest toolChest;
+
+ private final TimeseriesQuery baseQuery = Druids.newTimeseriesQueryBuilder()
+ .dataSource(DATASOURCE)
+ .intervals(List.of(Intervals.ETERNITY))
+ .aggregators(new CountAggregatorFactory("count"))
+ .build();
+
+ @Before
+ public void setUp()
+ {
+ conglomerate = EasyMock.createMock(QueryRunnerFactoryConglomerate.class);
+ texasRanger = EasyMock.createMock(QuerySegmentWalker.class);
+ metricsFactory = EasyMock.createMock(GenericQueryMetricsFactory.class);
+ emitter = EasyMock.createMock(ServiceEmitter.class);
+ requestLogger = EasyMock.createNiceMock(RequestLogger.class);
+ authzMapper = EasyMock.createNiceMock(AuthorizerMapper.class);
+ queryConfig = EasyMock.createMock(QueryConfigProvider.class);
+ toolChest = EasyMock.createNiceMock(QueryToolChest.class);
+ }
+
+ @After
+ public void tearDown()
+ {
+ EasyMock.verify(conglomerate, queryConfig);
+ }
+
+ @Test
+ public void testPerDatasourceTimeout_applied()
+ {
+ Map<String, PerSegmentTimeoutConfig> config = Map.of(
+ DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
+ );
+
+ expectDefaults();
+
+ QueryLifecycle lifecycle = createLifecycle(config);
+ lifecycle.initialize(baseQuery);
+
+ Assert.assertEquals(5000L,
lifecycle.getQuery().context().getPerSegmentTimeout());
+ }
+
+ @Test
+ public void testPerDatasourceTimeout_userOverrideWins()
+ {
+ Map<String, PerSegmentTimeoutConfig> config = Map.of(
+ DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
+ );
+
+ expectDefaults();
+
+ TimeseriesQuery queryWithUserTimeout = baseQuery.withOverriddenContext(
+ Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 2000L)
+ );
+
+ QueryLifecycle lifecycle = createLifecycle(config);
+ lifecycle.initialize(queryWithUserTimeout);
+
+ Assert.assertEquals(2000L,
lifecycle.getQuery().context().getPerSegmentTimeout());
+ }
+
+ @Test
+ public void testPerDatasourceTimeout_monitorOnlyDoesNotInject()
+ {
+ // monitorOnly=true: config exists but should NOT be enforced
+ Map<String, PerSegmentTimeoutConfig> config = Map.of(
+ DATASOURCE, new PerSegmentTimeoutConfig(5000, true)
+ );
+
+ expectDefaults();
+
+ QueryLifecycle lifecycle = createLifecycle(config);
+ lifecycle.initialize(baseQuery);
+
+ Assert.assertFalse(
+ "monitorOnly should not inject perSegmentTimeout",
+ lifecycle.getQuery().context().usePerSegmentTimeout()
+ );
+ }
+
+ @Test
+ public void testPerDatasourceTimeout_noMatchingDatasource()
+ {
+ Map<String, PerSegmentTimeoutConfig> config = Map.of(
+ "other_datasource", new PerSegmentTimeoutConfig(5000, false)
+ );
+
+ expectDefaults();
+
+ QueryLifecycle lifecycle = createLifecycle(config);
+ lifecycle.initialize(baseQuery);
+
+ Assert.assertFalse(lifecycle.getQuery().context().usePerSegmentTimeout());
+ }
+
+ @Test
+ public void testPerDatasourceTimeout_overridesSystemDefault()
+ {
+ Map<String, PerSegmentTimeoutConfig> config = Map.of(
+ DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
+ );
+
+ // System default sets perSegmentTimeout to 10000
+ EasyMock.expect(queryConfig.getContext())
+ .andReturn(Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 10000L))
+ .anyTimes();
+
EasyMock.expect(conglomerate.getToolChest(EasyMock.anyObject())).andReturn(toolChest).once();
+ EasyMock.replay(conglomerate, queryConfig);
+
+ QueryLifecycle lifecycle = createLifecycle(config);
+ lifecycle.initialize(baseQuery);
+
+ Assert.assertEquals(5000L,
lifecycle.getQuery().context().getPerSegmentTimeout());
+ }
+
+ @Test
+ public void testPrecedence_userOverridesPerDatasourceOverridesSystemDefault()
+ {
+ // System default: 10000, per-datasource: 5000, user: 2000 — user should
win
+ Map<String, PerSegmentTimeoutConfig> config = Map.of(
+ DATASOURCE, new PerSegmentTimeoutConfig(5000, false)
+ );
+
+ EasyMock.expect(queryConfig.getContext())
+ .andReturn(Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 10000L))
+ .anyTimes();
+
EasyMock.expect(conglomerate.getToolChest(EasyMock.anyObject())).andReturn(toolChest).once();
+ EasyMock.replay(conglomerate, queryConfig);
+
+ TimeseriesQuery queryWithUserTimeout = baseQuery.withOverriddenContext(
+ Map.of(QueryContexts.PER_SEGMENT_TIMEOUT_KEY, 2000L)
+ );
+
+ QueryLifecycle lifecycle = createLifecycle(config);
+ lifecycle.initialize(queryWithUserTimeout);
+
+ Assert.assertEquals(2000L,
lifecycle.getQuery().context().getPerSegmentTimeout());
+ }
+
+ private void expectDefaults()
+ {
+ EasyMock.expect(queryConfig.getContext()).andReturn(Map.of()).anyTimes();
+
EasyMock.expect(conglomerate.getToolChest(EasyMock.anyObject())).andReturn(toolChest).once();
+ EasyMock.replay(conglomerate, queryConfig);
+ }
+
+ private QueryLifecycle createLifecycle(Map<String, PerSegmentTimeoutConfig>
perSegmentTimeoutConfig)
+ {
+ return new QueryLifecycle(
+ conglomerate,
+ texasRanger,
+ metricsFactory,
+ emitter,
+ requestLogger,
+ authzMapper,
+ queryConfig,
+ new AuthConfig(),
+ NoopPolicyEnforcer.instance(),
+ Collections.emptyList(),
+ perSegmentTimeoutConfig,
+ System.currentTimeMillis(),
+ System.nanoTime()
+ );
+ }
+}
diff --git
a/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
b/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
index b478ab9befc..96880147f40 100644
--- a/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryLifecycleTest.java
@@ -80,6 +80,7 @@ import org.junit.rules.ExpectedException;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -859,6 +860,7 @@ public class QueryLifecycleTest
authConfig,
policyEnforcer,
queryBlocklist,
+ Collections.emptyMap(),
System.currentTimeMillis(),
System.nanoTime()
);
@@ -910,6 +912,7 @@ public class QueryLifecycleTest
authConfig,
policyEnforcer,
queryBlocklist,
+ Collections.emptyMap(),
System.currentTimeMillis(),
System.nanoTime()
);
diff --git
a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
index bf16044943e..d7a611bfc32 100644
--- a/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
+++ b/server/src/test/java/org/apache/druid/server/QueryResourceTest.java
@@ -776,6 +776,7 @@ public class QueryResourceTest
new AuthConfig(),
NoopPolicyEnforcer.instance(),
null,
+ Collections.emptyMap(),
System.currentTimeMillis(),
System.nanoTime()
)
diff --git
a/server/src/test/java/org/apache/druid/server/broker/BrokerDynamicConfigTest.java
b/server/src/test/java/org/apache/druid/server/broker/BrokerDynamicConfigTest.java
index 06e4df77c0a..425ccbdd14e 100644
---
a/server/src/test/java/org/apache/druid/server/broker/BrokerDynamicConfigTest.java
+++
b/server/src/test/java/org/apache/druid/server/broker/BrokerDynamicConfigTest.java
@@ -31,6 +31,7 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.List;
+import java.util.Map;
public class BrokerDynamicConfigTest
{
@@ -142,6 +143,36 @@ public class BrokerDynamicConfigTest
Assert.assertTrue(actual.getQueryContext().isEmpty());
}
+ @Test
+ public void testSerdeWithPerSegmentTimeoutConfig() throws Exception
+ {
+ String jsonStr = "{\n"
+ + " \"perSegmentTimeoutConfig\": {\n"
+ + " \"my_large_ds\": {\"perSegmentTimeoutMs\": 5000,
\"monitorOnly\": true},\n"
+ + " \"my_other_ds\": {\"perSegmentTimeoutMs\": 3000}\n"
+ + " }\n"
+ + "}\n";
+
+ BrokerDynamicConfig actual = mapper.readValue(
+ mapper.writeValueAsString(mapper.readValue(jsonStr,
BrokerDynamicConfig.class)),
+ BrokerDynamicConfig.class
+ );
+
+ Map<String, PerSegmentTimeoutConfig> expected = ImmutableMap.of(
+ "my_large_ds", new PerSegmentTimeoutConfig(5000, true),
+ "my_other_ds", new PerSegmentTimeoutConfig(3000, null)
+ );
+ Assert.assertEquals(expected, actual.getPerSegmentTimeoutConfig());
+ }
+
+ @Test
+ public void testNullPerSegmentTimeoutConfigDefaultsToEmptyMap() throws
Exception
+ {
+ BrokerDynamicConfig actual = mapper.readValue("{}",
BrokerDynamicConfig.class);
+ Assert.assertNotNull(actual.getPerSegmentTimeoutConfig());
+ Assert.assertTrue(actual.getPerSegmentTimeoutConfig().isEmpty());
+ }
+
@Test
public void testEquals()
{
diff --git
a/server/src/test/java/org/apache/druid/server/broker/PerSegmentTimeoutConfigTest.java
b/server/src/test/java/org/apache/druid/server/broker/PerSegmentTimeoutConfigTest.java
new file mode 100644
index 00000000000..80ceddf7e5e
--- /dev/null
+++
b/server/src/test/java/org/apache/druid/server/broker/PerSegmentTimeoutConfigTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.broker;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.segment.TestHelper;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class PerSegmentTimeoutConfigTest
+{
+ private final ObjectMapper mapper = TestHelper.makeJsonMapper();
+
+ @Test
+ public void testSerde() throws Exception
+ {
+ String json = "{\"perSegmentTimeoutMs\": 5000, \"monitorOnly\": true}";
+ PerSegmentTimeoutConfig config = mapper.readValue(json,
PerSegmentTimeoutConfig.class);
+ Assert.assertEquals(5000L, config.getPerSegmentTimeoutMs());
+ Assert.assertTrue(config.isMonitorOnly());
+
+ // Round-trip
+ PerSegmentTimeoutConfig roundTripped = mapper.readValue(
+ mapper.writeValueAsString(config),
+ PerSegmentTimeoutConfig.class
+ );
+ Assert.assertEquals(config, roundTripped);
+ }
+
+ @Test
+ public void testMonitorOnlyDefaultsToFalse() throws Exception
+ {
+ String json = "{\"perSegmentTimeoutMs\": 3000}";
+ PerSegmentTimeoutConfig config = mapper.readValue(json,
PerSegmentTimeoutConfig.class);
+ Assert.assertEquals(3000L, config.getPerSegmentTimeoutMs());
+ Assert.assertFalse(config.isMonitorOnly());
+ }
+
+ @Test
+ public void testRejectsZeroTimeout()
+ {
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> new PerSegmentTimeoutConfig(0, null)
+ );
+ }
+
+ @Test
+ public void testRejectsNegativeTimeout()
+ {
+ Assert.assertThrows(
+ IllegalArgumentException.class,
+ () -> new PerSegmentTimeoutConfig(-1, null)
+ );
+ }
+
+ @Test
+ public void testEquals()
+ {
+ EqualsVerifier.forClass(PerSegmentTimeoutConfig.class)
+ .usingGetClass()
+ .verify();
+ }
+}
diff --git
a/web-console/src/dialogs/broker-dynamic-config-dialog/broker-dynamic-config-completions.ts
b/web-console/src/dialogs/broker-dynamic-config-dialog/broker-dynamic-config-completions.ts
index 6620dc721f0..f38664319d6 100644
---
a/web-console/src/dialogs/broker-dynamic-config-dialog/broker-dynamic-config-completions.ts
+++
b/web-console/src/dialogs/broker-dynamic-config-dialog/broker-dynamic-config-completions.ts
@@ -34,6 +34,11 @@ export const BROKER_DYNAMIC_CONFIG_COMPLETIONS:
JsonCompletionRule[] = [
documentation:
'Default query context values applied to all queries on this broker.
These override static defaults from runtime properties but are overridden by
per-query context values. Useful for setting cluster-wide defaults like
priority or timeout without restarting.',
},
+ {
+ value: 'perSegmentTimeoutConfig',
+ documentation:
+ 'Per-datasource per-segment timeout configuration. Maps datasource
name to {perSegmentTimeoutMs, monitorOnly}. The broker injects the timeout into
query context unless the caller set it explicitly.',
+ },
],
},
// Query context key suggestions
@@ -99,6 +104,21 @@ export const BROKER_DYNAMIC_CONFIG_COMPLETIONS:
JsonCompletionRule[] = [
path: '$.queryBlocklist.[*].dataSources.[]',
completions: [{ value: 'example_datasource', documentation: 'Datasource
name to block' }],
},
+ // Per-segment timeout config per datasource
+ {
+ path: '$.perSegmentTimeoutConfig.*',
+ isObject: true,
+ completions: [
+ {
+ value: 'perSegmentTimeoutMs',
+ documentation: 'Per-segment timeout in milliseconds (required, must be
> 0)',
+ },
+ {
+ value: 'monitorOnly',
+ documentation: 'If true, log the configured timeout without enforcing
it (default: false)',
+ },
+ ],
+ },
// Context matches example
{
path: '$.queryBlocklist.[*].contextMatches',
diff --git
a/web-console/src/druid-models/broker-dynamic-config/broker-dynamic-config.tsx
b/web-console/src/druid-models/broker-dynamic-config/broker-dynamic-config.tsx
index 9fc66cd4fa1..40fb05ddcc4 100644
---
a/web-console/src/druid-models/broker-dynamic-config/broker-dynamic-config.tsx
+++
b/web-console/src/druid-models/broker-dynamic-config/broker-dynamic-config.tsx
@@ -18,9 +18,15 @@
import type { Field } from '../../components';
+export interface PerSegmentTimeoutConfig {
+ perSegmentTimeoutMs: number;
+ monitorOnly?: boolean;
+}
+
export interface BrokerDynamicConfig {
queryBlocklist?: QueryBlocklistRule[];
queryContext?: Record<string, unknown>;
+ perSegmentTimeoutConfig?: Record<string, PerSegmentTimeoutConfig>;
}
export interface QueryBlocklistRule {
@@ -51,4 +57,16 @@ export const BROKER_DYNAMIC_CONFIG_FIELDS:
Field<BrokerDynamicConfig>[] = [
</>
),
},
+ {
+ name: 'perSegmentTimeoutConfig',
+ type: 'json',
+ info: (
+ <>
+ Per-datasource per-segment timeout configuration. Maps datasource name
to timeout settings.
+ When a query targets a configured datasource, the broker injects the
per-segment timeout
+ into the query context (unless the caller already set it explicitly).
Set{' '}
+ <code>monitorOnly: true</code> to log without enforcing.
+ </>
+ ),
+ },
];
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]