This is an automated email from the ASF dual-hosted git repository. capistrant pushed a commit to branch revert-19328-deprioritize-turbo-historical in repository https://gitbox.apache.org/repos/asf/druid.git
commit 360cfb113c1d417d02cc942803f00766cde27c3c Author: Lucas Capistrant <[email protected]> AuthorDate: Mon Apr 20 11:15:45 2026 -0500 Revert "feat: Deprioritize turbo loading historicals for broker query routing…" This reverts commit 1f1cca6b0ead924e68a44fefbfe6a935bfdee118. --- docs/configuration/index.md | 2 +- .../client/BrokerViewOfCoordinatorConfig.java | 82 ++----- .../client/BrokerViewOfCoordinatorConfigTest.java | 238 --------------------- 3 files changed, 16 insertions(+), 306 deletions(-) diff --git a/docs/configuration/index.md b/docs/configuration/index.md index 59ad60a5be8..e309126863a 100644 --- a/docs/configuration/index.md +++ b/docs/configuration/index.md @@ -833,7 +833,7 @@ The following table shows the dynamic configuration properties for the Coordinat |`decommissioningNodes`|List of Historical servers to decommission. Coordinator will not assign new segments to decommissioning servers, and segments will be moved away from them to be placed on non-decommissioning servers at the maximum rate specified by `maxSegmentsToMove`.|none| |`pauseCoordination`|Boolean flag for whether or not the Coordinator should execute its various duties of coordinating the cluster. Setting this to true essentially pauses all coordination work while allowing the API to remain up. Duties that are paused include all classes that implement the `CoordinatorDuty` interface. Such duties include: segment balancing, segment compaction, submitting kill tasks for unused segments (if enabled), logging of used segments in the cluster, marking of ne [...] |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional replication is needed for segments that have failed to load due to the expiry of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator will attempt to replicate the failed segment on a different historical server. This helps improve the segment availability if there are a few slow Historicals in the cluster. However, the slow Historical may still load the segment later and the Coordinator may issu [...] -|`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.<br/>While a server is in `turboLoadingNodes`, Brokers demote it to the lowest query priority: for any segmen [...] +|`turboLoadingNodes`| Experimental. List of Historical servers to place in turbo loading mode. These servers use a larger thread-pool to load segments faster but at the cost of query performance. For servers specified in `turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is ignored and the coordinator uses the value of the respective `numLoadingThreads` instead.<br/>Please use this config with caution. All servers should eventually be removed from this list once the se [...] |`cloneServers`| Experimental. Map from target Historical server to source Historical server which should be cloned by the target. The target Historical does not participate in regular segment assignment or balancing. Instead, the Coordinator mirrors any segment assignment made to the source Historical onto the target Historical, so that the target becomes an exact copy of the source. Segments on the target Historical do not count towards replica counts either. If the source disappears, [...] |`historicalTierAliases`|Map from a virtual tier name to the set of real Historical tier names it expands to. When a load/drop rule references a virtual alias tier, the Coordinator replaces it with its real tiers — each receiving the full replica count independently. The alias key itself is never loaded to directly. For example, `{"hot": ["hot_1", "hot_2"]}` causes a rule of `{"hot": 2}` to load 2 replicas on each of `hot_1` and `hot_2`; `hot` receives no direct assignment. An alias valu [...] diff --git a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java index 316378116e5..f09a51a06b3 100644 --- a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java +++ b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java @@ -41,10 +41,9 @@ import org.apache.druid.server.BrokerDynamicConfigResource; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import javax.validation.constraints.NotNull; -import java.util.Comparator; -import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; /** * Broker view of the coordinator dynamic configuration, and its derived values such as target and source clone servers. @@ -59,8 +58,6 @@ public class BrokerViewOfCoordinatorConfig extends BaseBrokerViewOfConfig<Coordi private Set<String> targetCloneServers; @GuardedBy("this") private Set<String> sourceCloneServers; - @GuardedBy("this") - private Set<String> turboLoadingNodes; @Inject public BrokerViewOfCoordinatorConfig( @@ -100,7 +97,7 @@ public class BrokerViewOfCoordinatorConfig extends BaseBrokerViewOfConfig<Coordi /** * Update the config view with a new coordinator dynamic config snapshot. Also updates the source and target clone - * servers and the turbo-loading nodes based on the new dynamic configuration. + * servers based on the new dynamic configuration. */ @Override public synchronized void setDynamicConfig(@NotNull CoordinatorDynamicConfig updatedConfig) @@ -109,7 +106,6 @@ public class BrokerViewOfCoordinatorConfig extends BaseBrokerViewOfConfig<Coordi final Map<String, String> cloneServers = updatedConfig.getCloneServers(); this.targetCloneServers = ImmutableSet.copyOf(cloneServers.keySet()); this.sourceCloneServers = ImmutableSet.copyOf(cloneServers.values()); - this.turboLoadingNodes = ImmutableSet.copyOf(updatedConfig.getTurboLoadingNodes()); } @Override @@ -118,70 +114,37 @@ public class BrokerViewOfCoordinatorConfig extends BaseBrokerViewOfConfig<Coordi CloneQueryMode mode ) { - final Set<String> serversToIgnore; - final Set<String> turboNodes; - synchronized (this) { - serversToIgnore = getCurrentServersToIgnore(mode); - turboNodes = turboLoadingNodes == null ? Set.of() : turboLoadingNodes; - } + final Set<String> serversToIgnore = getCurrentServersToIgnore(mode); - if (serversToIgnore.isEmpty() && turboNodes.isEmpty()) { + if (serversToIgnore.isEmpty()) { return historicalServers; } - // Preserve the comparator from the input map so the tier selection strategy's ordering is respected. - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> filteredHistoricals = - new Int2ObjectRBTreeMap<>(historicalServers.comparator()); - - final Set<QueryableDruidServer> deprioritizedTurboHistoricals = new HashSet<>(); - + final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> filteredHistoricals = new Int2ObjectRBTreeMap<>(); for (int priority : historicalServers.keySet()) { - final Set<QueryableDruidServer> preferredHistoricals = new HashSet<>(); - for (QueryableDruidServer server : historicalServers.get(priority)) { - final String host = server.getServer().getHost(); - if (serversToIgnore.contains(host)) { - // Clone filtering removes the server entirely from the queryable set. - continue; - } - if (turboNodes.contains(host)) { - // Turbo-loading servers are demoted to a dead-last bucket so queries prefer - // non-turbo replicas, but still fall back to turbo replicas when no alternative exists. - deprioritizedTurboHistoricals.add(server); - continue; - } - preferredHistoricals.add(server); - } - if (!preferredHistoricals.isEmpty()) { - filteredHistoricals.put(priority, preferredHistoricals); - } - } - - if (!deprioritizedTurboHistoricals.isEmpty()) { - final int deadLastPriority = computeDeadLastPriority(historicalServers.comparator()); - // If a real tier already occupies MIN/MAX_VALUE, merge rather than overwrite. - filteredHistoricals.merge(deadLastPriority, deprioritizedTurboHistoricals, (existing, toAdd) -> { - final Set<QueryableDruidServer> merged = new HashSet<>(existing); - merged.addAll(toAdd); - return merged; - }); + Set<QueryableDruidServer> servers = historicalServers.get(priority); + filteredHistoricals.put(priority, + servers.stream() + .filter(server -> !serversToIgnore.contains(server.getServer().getHost())) + .collect(Collectors.toSet()) + ); } return filteredHistoricals; } /** - * Get the set of servers that should not be queried based on the cloneQueryMode parameter. + * Get the list of servers that should not be queried based on the cloneQueryMode parameter. */ - @GuardedBy("this") - private Set<String> getCurrentServersToIgnore(CloneQueryMode cloneQueryMode) + private synchronized Set<String> getCurrentServersToIgnore(CloneQueryMode cloneQueryMode) { switch (cloneQueryMode) { case PREFERCLONES: // Remove servers being cloned targets, so that clones are queried. - return sourceCloneServers == null ? Set.of() : sourceCloneServers; + return sourceCloneServers; case EXCLUDECLONES: // Remove clones, so that only source servers are queried. - return targetCloneServers == null ? Set.of() : targetCloneServers; + return targetCloneServers; case INCLUDECLONES: // Don't remove either. return Set.of(); @@ -189,19 +152,4 @@ public class BrokerViewOfCoordinatorConfig extends BaseBrokerViewOfConfig<Coordi throw DruidException.defensive("Unexpected value of cloneQueryMode[%s]", cloneQueryMode); } } - - /** - * Pick a priority value that sorts LAST under the given comparator, so that turbo-loading - * servers are only considered when no other server has the segment. - */ - private static int computeDeadLastPriority(Comparator<? super Integer> cmp) - { - if (cmp == null) { - // Natural int ordering (ascending) → MAX_VALUE sorts last. - return Integer.MAX_VALUE; - } - // If the comparator puts higher values first (e.g. reverseOrder), then MIN_VALUE - // sorts last; otherwise MAX_VALUE sorts last. - return cmp.compare(1, 0) < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE; - } } diff --git a/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java b/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java index 86176888119..5a014531bf5 100644 --- a/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java +++ b/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java @@ -20,22 +20,14 @@ package org.apache.druid.client; import com.google.common.util.concurrent.Futures; -import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap; import org.apache.druid.client.coordinator.CoordinatorClient; -import org.apache.druid.query.CloneQueryMode; -import org.apache.druid.server.coordination.ServerType; import org.apache.druid.server.coordinator.CoordinatorDynamicConfig; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; import java.util.Map; -import java.util.Set; public class BrokerViewOfCoordinatorConfigTest { @@ -63,234 +55,4 @@ public class BrokerViewOfCoordinatorConfigTest Mockito.verify(coordinatorClient, Mockito.times(1)).getCoordinatorDynamicConfig(); Assert.assertEquals(config, target.getDynamicConfig()); } - - @Test - public void testGetQueryableServers_noFiltersOrTurbo_returnsOriginalMapReference() - { - target.setDynamicConfig(CoordinatorDynamicConfig.builder().build()); - - final QueryableDruidServer server = historical("h1:8080", 0); - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input = - new Int2ObjectRBTreeMap<>(Comparator.reverseOrder()); - input.put(0, Set.of(server)); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result = - target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES); - - // Fast path: no filtering needed, return the original map reference unchanged. - Assert.assertSame(input, result); - } - - @Test - public void testGetQueryableServers_turboNodeDemoted_highestPriorityStrategy() - { - target.setDynamicConfig( - CoordinatorDynamicConfig - .builder() - .withTurboLoadingNodes(Set.of("turbo:8080")) - .build() - ); - - final QueryableDruidServer nonTurboHighTier = historical("normal-high:8080", 10); - final QueryableDruidServer nonTurboLowTier = historical("normal-low:8080", 5); - final QueryableDruidServer turbo = historical("turbo:8080", 10); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input = - new Int2ObjectRBTreeMap<>(Comparator.reverseOrder()); - input.put(10, Set.of(nonTurboHighTier, turbo)); - input.put(5, Set.of(nonTurboLowTier)); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result = - target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES); - - // Expected iteration order for highest-priority-first comparator: - // 10 -> [nonTurboHighTier] - // 5 -> [nonTurboLowTier] - // MIN -> [turbo] (dead last) - Assert.assertEquals( - Arrays.asList(10, 5, Integer.MIN_VALUE), - new ArrayList<>(result.keySet()) - ); - Assert.assertEquals(Set.of(nonTurboHighTier), result.get(10)); - Assert.assertEquals(Set.of(nonTurboLowTier), result.get(5)); - Assert.assertEquals(Set.of(turbo), result.get(Integer.MIN_VALUE)); - } - - @Test - public void testGetQueryableServers_turboNodeDemoted_lowestPriorityStrategy() - { - target.setDynamicConfig( - CoordinatorDynamicConfig - .builder() - .withTurboLoadingNodes(Set.of("turbo:8080")) - .build() - ); - - final QueryableDruidServer nonTurboLow = historical("normal-low:8080", 5); - final QueryableDruidServer nonTurboHigh = historical("normal-high:8080", 10); - final QueryableDruidServer turbo = historical("turbo:8080", 5); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input = - new Int2ObjectRBTreeMap<>(Comparator.naturalOrder()); - input.put(5, Set.of(nonTurboLow, turbo)); - input.put(10, Set.of(nonTurboHigh)); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result = - target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES); - - // Expected iteration order for lowest-priority-first comparator: - // 5 -> [nonTurboLow] - // 10 -> [nonTurboHigh] - // MAX -> [turbo] (dead last) - Assert.assertEquals( - Arrays.asList(5, 10, Integer.MAX_VALUE), - new ArrayList<>(result.keySet()) - ); - Assert.assertEquals(Set.of(nonTurboLow), result.get(5)); - Assert.assertEquals(Set.of(nonTurboHigh), result.get(10)); - Assert.assertEquals(Set.of(turbo), result.get(Integer.MAX_VALUE)); - } - - @Test - public void testGetQueryableServers_allTurbo_stillQueryableAsFallback() - { - target.setDynamicConfig( - CoordinatorDynamicConfig - .builder() - .withTurboLoadingNodes(Set.of("turbo1:8080", "turbo2:8080")) - .build() - ); - - final QueryableDruidServer turbo1 = historical("turbo1:8080", 10); - final QueryableDruidServer turbo2 = historical("turbo2:8080", 5); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input = - new Int2ObjectRBTreeMap<>(Comparator.reverseOrder()); - input.put(10, Set.of(turbo1)); - input.put(5, Set.of(turbo2)); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result = - target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES); - - // Original tier buckets are emptied out and removed; all turbo servers collapse - // into the dead-last bucket so queries still have somewhere to land. - Assert.assertEquals( - Collections.singletonList(Integer.MIN_VALUE), - new ArrayList<>(result.keySet()) - ); - Assert.assertEquals(Set.of(turbo1, turbo2), result.get(Integer.MIN_VALUE)); - } - - @Test - public void testGetQueryableServers_turboPlusCloneFilter_bothApplied() - { - target.setDynamicConfig( - CoordinatorDynamicConfig - .builder() - .withCloneServers(Map.of("clone:8080", "source:8080")) - .withTurboLoadingNodes(Set.of("turbo:8080")) - .build() - ); - - final QueryableDruidServer source = historical("source:8080", 0); - final QueryableDruidServer clone = historical("clone:8080", 0); - final QueryableDruidServer turbo = historical("turbo:8080", 0); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input = - new Int2ObjectRBTreeMap<>(Comparator.reverseOrder()); - input.put(0, Set.of(source, clone, turbo)); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result = - target.getQueryableServers(input, CloneQueryMode.EXCLUDECLONES); - - // EXCLUDECLONES drops the clone entirely; turbo is demoted to dead-last. - Assert.assertEquals(Set.of(source), result.get(0)); - Assert.assertEquals(Set.of(turbo), result.get(Integer.MIN_VALUE)); - Assert.assertFalse(result.values().stream().flatMap(Set::stream).anyMatch(s -> s == clone)); - } - - @Test - public void testGetQueryableServers_emptiedTier_priorityKeyOmitted() - { - target.setDynamicConfig( - CoordinatorDynamicConfig - .builder() - .withCloneServers(Map.of("clone:8080", "source:8080")) - .build() - ); - - final QueryableDruidServer clone = historical("clone:8080", 3); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input = - new Int2ObjectRBTreeMap<>(Comparator.reverseOrder()); - input.put(3, Set.of(clone)); - - final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result = - target.getQueryableServers(input, CloneQueryMode.EXCLUDECLONES); - - // Tier 3 was the only tier and its only server was clone-filtered; the priority - // key should be absent rather than present with an empty set. - Assert.assertTrue(result.isEmpty()); - } - - @Test - public void testSetDynamicConfig_updatesCachedTurboSet() - { - target.setDynamicConfig( - CoordinatorDynamicConfig - .builder() - .withTurboLoadingNodes(Set.of("turbo-v1:8080")) - .build() - ); - - final QueryableDruidServer serverV1 = historical("turbo-v1:8080", 0); - final QueryableDruidServer serverV2 = historical("turbo-v2:8080", 0); - - Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input = - new Int2ObjectRBTreeMap<>(Comparator.reverseOrder()); - input.put(0, Set.of(serverV1, serverV2)); - - Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result = - target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES); - Assert.assertEquals(Set.of(serverV2), result.get(0)); - Assert.assertEquals(Set.of(serverV1), result.get(Integer.MIN_VALUE)); - - // Swap which server is in turbo mode. - target.setDynamicConfig( - CoordinatorDynamicConfig.builder() - .withTurboLoadingNodes(Set.of("turbo-v2:8080")) - .build() - ); - - input = new Int2ObjectRBTreeMap<>(Comparator.reverseOrder()); - input.put(0, Set.of(serverV1, serverV2)); - - result = target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES); - Assert.assertEquals(Set.of(serverV1), result.get(0)); - Assert.assertEquals(Set.of(serverV2), result.get(Integer.MIN_VALUE)); - } - - /** - * Build a {@link QueryableDruidServer} representing a historical at the given host and priority. - * The {@code hostAndPort} string doubles as the server name — the broker's turbo and clone - * lookups both key off {@link DruidServer#getHost()}, so tests need the host string under - * their control. The query runner is left null since these tests never dispatch queries; - * they only exercise the routing/filtering logic in - * {@link BrokerViewOfCoordinatorConfig#getQueryableServers}. - */ - private static QueryableDruidServer historical(String hostAndPort, int priority) - { - final DruidServer server = new DruidServer( - hostAndPort, - hostAndPort, - null, - 0L, - null, - ServerType.HISTORICAL, - DruidServer.DEFAULT_TIER, - priority - ); - return new QueryableDruidServer(server, null); - } - } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
