This is an automated email from the ASF dual-hosted git repository.
capistrant pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 81510abfb96 Revert "feat: Deprioritize turbo loading historicals for
broker query routing…" (#19355)
81510abfb96 is described below
commit 81510abfb9648f3519c5f549362392c235565685
Author: Lucas Capistrant <[email protected]>
AuthorDate: Mon Apr 20 13:36:36 2026 -0500
Revert "feat: Deprioritize turbo loading historicals for broker query
routing…" (#19355)
This reverts commit 1f1cca6b0ead924e68a44fefbfe6a935bfdee118.
---
docs/configuration/index.md | 2 +-
.../client/BrokerViewOfCoordinatorConfig.java | 82 ++-----
.../client/BrokerViewOfCoordinatorConfigTest.java | 238 ---------------------
3 files changed, 16 insertions(+), 306 deletions(-)
diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 59ad60a5be8..e309126863a 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -833,7 +833,7 @@ The following table shows the dynamic configuration
properties for the Coordinat
|`decommissioningNodes`|List of Historical servers to decommission.
Coordinator will not assign new segments to decommissioning servers, and
segments will be moved away from them to be placed on non-decommissioning
servers at the maximum rate specified by `maxSegmentsToMove`.|none|
|`pauseCoordination`|Boolean flag for whether or not the Coordinator should
execute its various duties of coordinating the cluster. Setting this to true
essentially pauses all coordination work while allowing the API to remain up.
Duties that are paused include all classes that implement the `CoordinatorDuty`
interface. Such duties include: segment balancing, segment compaction,
submitting kill tasks for unused segments (if enabled), logging of used
segments in the cluster, marking of ne [...]
|`replicateAfterLoadTimeout`|Boolean flag for whether or not additional
replication is needed for segments that have failed to load due to the expiry
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator
will attempt to replicate the failed segment on a different historical server.
This helps improve the segment availability if there are a few slow Historicals
in the cluster. However, the slow Historical may still load the segment later
and the Coordinator may issu [...]
-|`turboLoadingNodes`| Experimental. List of Historical servers to place in
turbo loading mode. These servers use a larger thread-pool to load segments
faster but at the cost of query performance. For servers specified in
`turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is
ignored and the coordinator uses the value of the respective
`numLoadingThreads` instead.<br/>While a server is in `turboLoadingNodes`,
Brokers demote it to the lowest query priority: for any segmen [...]
+|`turboLoadingNodes`| Experimental. List of Historical servers to place in
turbo loading mode. These servers use a larger thread-pool to load segments
faster but at the cost of query performance. For servers specified in
`turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is
ignored and the coordinator uses the value of the respective
`numLoadingThreads` instead.<br/>Please use this config with caution. All
servers should eventually be removed from this list once the se [...]
|`cloneServers`| Experimental. Map from target Historical server to source
Historical server which should be cloned by the target. The target Historical
does not participate in regular segment assignment or balancing. Instead, the
Coordinator mirrors any segment assignment made to the source Historical onto
the target Historical, so that the target becomes an exact copy of the source.
Segments on the target Historical do not count towards replica counts either.
If the source disappears, [...]
|`historicalTierAliases`|Map from a virtual tier name to the set of real
Historical tier names it expands to. When a load/drop rule references a virtual
alias tier, the Coordinator replaces it with its real tiers — each receiving
the full replica count independently. The alias key itself is never loaded to
directly. For example, `{"hot": ["hot_1", "hot_2"]}` causes a rule of `{"hot":
2}` to load 2 replicas on each of `hot_1` and `hot_2`; `hot` receives no direct
assignment. An alias valu [...]
diff --git
a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
index 316378116e5..f09a51a06b3 100644
---
a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
+++
b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
@@ -41,10 +41,9 @@ import org.apache.druid.server.BrokerDynamicConfigResource;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import javax.validation.constraints.NotNull;
-import java.util.Comparator;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Broker view of the coordinator dynamic configuration, and its derived
values such as target and source clone servers.
@@ -59,8 +58,6 @@ public class BrokerViewOfCoordinatorConfig extends
BaseBrokerViewOfConfig<Coordi
private Set<String> targetCloneServers;
@GuardedBy("this")
private Set<String> sourceCloneServers;
- @GuardedBy("this")
- private Set<String> turboLoadingNodes;
@Inject
public BrokerViewOfCoordinatorConfig(
@@ -100,7 +97,7 @@ public class BrokerViewOfCoordinatorConfig extends
BaseBrokerViewOfConfig<Coordi
/**
* Update the config view with a new coordinator dynamic config snapshot.
Also updates the source and target clone
- * servers and the turbo-loading nodes based on the new dynamic
configuration.
+ * servers based on the new dynamic configuration.
*/
@Override
public synchronized void setDynamicConfig(@NotNull CoordinatorDynamicConfig
updatedConfig)
@@ -109,7 +106,6 @@ public class BrokerViewOfCoordinatorConfig extends
BaseBrokerViewOfConfig<Coordi
final Map<String, String> cloneServers = updatedConfig.getCloneServers();
this.targetCloneServers = ImmutableSet.copyOf(cloneServers.keySet());
this.sourceCloneServers = ImmutableSet.copyOf(cloneServers.values());
- this.turboLoadingNodes =
ImmutableSet.copyOf(updatedConfig.getTurboLoadingNodes());
}
@Override
@@ -118,70 +114,37 @@ public class BrokerViewOfCoordinatorConfig extends
BaseBrokerViewOfConfig<Coordi
CloneQueryMode mode
)
{
- final Set<String> serversToIgnore;
- final Set<String> turboNodes;
- synchronized (this) {
- serversToIgnore = getCurrentServersToIgnore(mode);
- turboNodes = turboLoadingNodes == null ? Set.of() : turboLoadingNodes;
- }
+ final Set<String> serversToIgnore = getCurrentServersToIgnore(mode);
- if (serversToIgnore.isEmpty() && turboNodes.isEmpty()) {
+ if (serversToIgnore.isEmpty()) {
return historicalServers;
}
- // Preserve the comparator from the input map so the tier selection
strategy's ordering is respected.
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> filteredHistoricals =
- new Int2ObjectRBTreeMap<>(historicalServers.comparator());
-
- final Set<QueryableDruidServer> deprioritizedTurboHistoricals = new
HashSet<>();
-
+ final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> filteredHistoricals =
new Int2ObjectRBTreeMap<>();
for (int priority : historicalServers.keySet()) {
- final Set<QueryableDruidServer> preferredHistoricals = new HashSet<>();
- for (QueryableDruidServer server : historicalServers.get(priority)) {
- final String host = server.getServer().getHost();
- if (serversToIgnore.contains(host)) {
- // Clone filtering removes the server entirely from the queryable
set.
- continue;
- }
- if (turboNodes.contains(host)) {
- // Turbo-loading servers are demoted to a dead-last bucket so
queries prefer
- // non-turbo replicas, but still fall back to turbo replicas when no
alternative exists.
- deprioritizedTurboHistoricals.add(server);
- continue;
- }
- preferredHistoricals.add(server);
- }
- if (!preferredHistoricals.isEmpty()) {
- filteredHistoricals.put(priority, preferredHistoricals);
- }
- }
-
- if (!deprioritizedTurboHistoricals.isEmpty()) {
- final int deadLastPriority =
computeDeadLastPriority(historicalServers.comparator());
- // If a real tier already occupies MIN/MAX_VALUE, merge rather than
overwrite.
- filteredHistoricals.merge(deadLastPriority,
deprioritizedTurboHistoricals, (existing, toAdd) -> {
- final Set<QueryableDruidServer> merged = new HashSet<>(existing);
- merged.addAll(toAdd);
- return merged;
- });
+ Set<QueryableDruidServer> servers = historicalServers.get(priority);
+ filteredHistoricals.put(priority,
+ servers.stream()
+ .filter(server ->
!serversToIgnore.contains(server.getServer().getHost()))
+ .collect(Collectors.toSet())
+ );
}
return filteredHistoricals;
}
/**
- * Get the set of servers that should not be queried based on the
cloneQueryMode parameter.
+ * Get the list of servers that should not be queried based on the
cloneQueryMode parameter.
*/
- @GuardedBy("this")
- private Set<String> getCurrentServersToIgnore(CloneQueryMode cloneQueryMode)
+ private synchronized Set<String> getCurrentServersToIgnore(CloneQueryMode
cloneQueryMode)
{
switch (cloneQueryMode) {
case PREFERCLONES:
// Remove servers being cloned targets, so that clones are queried.
- return sourceCloneServers == null ? Set.of() : sourceCloneServers;
+ return sourceCloneServers;
case EXCLUDECLONES:
// Remove clones, so that only source servers are queried.
- return targetCloneServers == null ? Set.of() : targetCloneServers;
+ return targetCloneServers;
case INCLUDECLONES:
// Don't remove either.
return Set.of();
@@ -189,19 +152,4 @@ public class BrokerViewOfCoordinatorConfig extends
BaseBrokerViewOfConfig<Coordi
throw DruidException.defensive("Unexpected value of
cloneQueryMode[%s]", cloneQueryMode);
}
}
-
- /**
- * Pick a priority value that sorts LAST under the given comparator, so that
turbo-loading
- * servers are only considered when no other server has the segment.
- */
- private static int computeDeadLastPriority(Comparator<? super Integer> cmp)
- {
- if (cmp == null) {
- // Natural int ordering (ascending) → MAX_VALUE sorts last.
- return Integer.MAX_VALUE;
- }
- // If the comparator puts higher values first (e.g. reverseOrder), then
MIN_VALUE
- // sorts last; otherwise MAX_VALUE sorts last.
- return cmp.compare(1, 0) < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
- }
}
diff --git
a/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java
b/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java
index 86176888119..5a014531bf5 100644
---
a/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java
+++
b/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java
@@ -20,22 +20,14 @@
package org.apache.druid.client;
import com.google.common.util.concurrent.Futures;
-import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.query.CloneQueryMode;
-import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
import java.util.Map;
-import java.util.Set;
public class BrokerViewOfCoordinatorConfigTest
{
@@ -63,234 +55,4 @@ public class BrokerViewOfCoordinatorConfigTest
Mockito.verify(coordinatorClient,
Mockito.times(1)).getCoordinatorDynamicConfig();
Assert.assertEquals(config, target.getDynamicConfig());
}
-
- @Test
- public void
testGetQueryableServers_noFiltersOrTurbo_returnsOriginalMapReference()
- {
- target.setDynamicConfig(CoordinatorDynamicConfig.builder().build());
-
- final QueryableDruidServer server = historical("h1:8080", 0);
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
- new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
- input.put(0, Set.of(server));
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
- target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-
- // Fast path: no filtering needed, return the original map reference
unchanged.
- Assert.assertSame(input, result);
- }
-
- @Test
- public void
testGetQueryableServers_turboNodeDemoted_highestPriorityStrategy()
- {
- target.setDynamicConfig(
- CoordinatorDynamicConfig
- .builder()
- .withTurboLoadingNodes(Set.of("turbo:8080"))
- .build()
- );
-
- final QueryableDruidServer nonTurboHighTier =
historical("normal-high:8080", 10);
- final QueryableDruidServer nonTurboLowTier = historical("normal-low:8080",
5);
- final QueryableDruidServer turbo = historical("turbo:8080", 10);
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
- new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
- input.put(10, Set.of(nonTurboHighTier, turbo));
- input.put(5, Set.of(nonTurboLowTier));
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
- target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-
- // Expected iteration order for highest-priority-first comparator:
- // 10 -> [nonTurboHighTier]
- // 5 -> [nonTurboLowTier]
- // MIN -> [turbo] (dead last)
- Assert.assertEquals(
- Arrays.asList(10, 5, Integer.MIN_VALUE),
- new ArrayList<>(result.keySet())
- );
- Assert.assertEquals(Set.of(nonTurboHighTier), result.get(10));
- Assert.assertEquals(Set.of(nonTurboLowTier), result.get(5));
- Assert.assertEquals(Set.of(turbo), result.get(Integer.MIN_VALUE));
- }
-
- @Test
- public void testGetQueryableServers_turboNodeDemoted_lowestPriorityStrategy()
- {
- target.setDynamicConfig(
- CoordinatorDynamicConfig
- .builder()
- .withTurboLoadingNodes(Set.of("turbo:8080"))
- .build()
- );
-
- final QueryableDruidServer nonTurboLow = historical("normal-low:8080", 5);
- final QueryableDruidServer nonTurboHigh = historical("normal-high:8080",
10);
- final QueryableDruidServer turbo = historical("turbo:8080", 5);
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
- new Int2ObjectRBTreeMap<>(Comparator.naturalOrder());
- input.put(5, Set.of(nonTurboLow, turbo));
- input.put(10, Set.of(nonTurboHigh));
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
- target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-
- // Expected iteration order for lowest-priority-first comparator:
- // 5 -> [nonTurboLow]
- // 10 -> [nonTurboHigh]
- // MAX -> [turbo] (dead last)
- Assert.assertEquals(
- Arrays.asList(5, 10, Integer.MAX_VALUE),
- new ArrayList<>(result.keySet())
- );
- Assert.assertEquals(Set.of(nonTurboLow), result.get(5));
- Assert.assertEquals(Set.of(nonTurboHigh), result.get(10));
- Assert.assertEquals(Set.of(turbo), result.get(Integer.MAX_VALUE));
- }
-
- @Test
- public void testGetQueryableServers_allTurbo_stillQueryableAsFallback()
- {
- target.setDynamicConfig(
- CoordinatorDynamicConfig
- .builder()
- .withTurboLoadingNodes(Set.of("turbo1:8080", "turbo2:8080"))
- .build()
- );
-
- final QueryableDruidServer turbo1 = historical("turbo1:8080", 10);
- final QueryableDruidServer turbo2 = historical("turbo2:8080", 5);
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
- new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
- input.put(10, Set.of(turbo1));
- input.put(5, Set.of(turbo2));
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
- target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-
- // Original tier buckets are emptied out and removed; all turbo servers
collapse
- // into the dead-last bucket so queries still have somewhere to land.
- Assert.assertEquals(
- Collections.singletonList(Integer.MIN_VALUE),
- new ArrayList<>(result.keySet())
- );
- Assert.assertEquals(Set.of(turbo1, turbo2), result.get(Integer.MIN_VALUE));
- }
-
- @Test
- public void testGetQueryableServers_turboPlusCloneFilter_bothApplied()
- {
- target.setDynamicConfig(
- CoordinatorDynamicConfig
- .builder()
- .withCloneServers(Map.of("clone:8080", "source:8080"))
- .withTurboLoadingNodes(Set.of("turbo:8080"))
- .build()
- );
-
- final QueryableDruidServer source = historical("source:8080", 0);
- final QueryableDruidServer clone = historical("clone:8080", 0);
- final QueryableDruidServer turbo = historical("turbo:8080", 0);
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
- new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
- input.put(0, Set.of(source, clone, turbo));
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
- target.getQueryableServers(input, CloneQueryMode.EXCLUDECLONES);
-
- // EXCLUDECLONES drops the clone entirely; turbo is demoted to dead-last.
- Assert.assertEquals(Set.of(source), result.get(0));
- Assert.assertEquals(Set.of(turbo), result.get(Integer.MIN_VALUE));
-
Assert.assertFalse(result.values().stream().flatMap(Set::stream).anyMatch(s ->
s == clone));
- }
-
- @Test
- public void testGetQueryableServers_emptiedTier_priorityKeyOmitted()
- {
- target.setDynamicConfig(
- CoordinatorDynamicConfig
- .builder()
- .withCloneServers(Map.of("clone:8080", "source:8080"))
- .build()
- );
-
- final QueryableDruidServer clone = historical("clone:8080", 3);
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
- new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
- input.put(3, Set.of(clone));
-
- final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
- target.getQueryableServers(input, CloneQueryMode.EXCLUDECLONES);
-
- // Tier 3 was the only tier and its only server was clone-filtered; the
priority
- // key should be absent rather than present with an empty set.
- Assert.assertTrue(result.isEmpty());
- }
-
- @Test
- public void testSetDynamicConfig_updatesCachedTurboSet()
- {
- target.setDynamicConfig(
- CoordinatorDynamicConfig
- .builder()
- .withTurboLoadingNodes(Set.of("turbo-v1:8080"))
- .build()
- );
-
- final QueryableDruidServer serverV1 = historical("turbo-v1:8080", 0);
- final QueryableDruidServer serverV2 = historical("turbo-v2:8080", 0);
-
- Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
- new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
- input.put(0, Set.of(serverV1, serverV2));
-
- Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
- target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
- Assert.assertEquals(Set.of(serverV2), result.get(0));
- Assert.assertEquals(Set.of(serverV1), result.get(Integer.MIN_VALUE));
-
- // Swap which server is in turbo mode.
- target.setDynamicConfig(
- CoordinatorDynamicConfig.builder()
- .withTurboLoadingNodes(Set.of("turbo-v2:8080"))
- .build()
- );
-
- input = new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
- input.put(0, Set.of(serverV1, serverV2));
-
- result = target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
- Assert.assertEquals(Set.of(serverV1), result.get(0));
- Assert.assertEquals(Set.of(serverV2), result.get(Integer.MIN_VALUE));
- }
-
- /**
- * Build a {@link QueryableDruidServer} representing a historical at the
given host and priority.
- * The {@code hostAndPort} string doubles as the server name — the broker's
turbo and clone
- * lookups both key off {@link DruidServer#getHost()}, so tests need the
host string under
- * their control. The query runner is left null since these tests never
dispatch queries;
- * they only exercise the routing/filtering logic in
- * {@link BrokerViewOfCoordinatorConfig#getQueryableServers}.
- */
- private static QueryableDruidServer historical(String hostAndPort, int
priority)
- {
- final DruidServer server = new DruidServer(
- hostAndPort,
- hostAndPort,
- null,
- 0L,
- null,
- ServerType.HISTORICAL,
- DruidServer.DEFAULT_TIER,
- priority
- );
- return new QueryableDruidServer(server, null);
- }
-
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]