This is an automated email from the ASF dual-hosted git repository.

capistrant pushed a commit to branch revert-19328-deprioritize-turbo-historical
in repository https://gitbox.apache.org/repos/asf/druid.git

commit 360cfb113c1d417d02cc942803f00766cde27c3c
Author: Lucas Capistrant <[email protected]>
AuthorDate: Mon Apr 20 11:15:45 2026 -0500

    Revert "feat: Deprioritize turbo loading historicals for broker query 
routing…"
    
    This reverts commit 1f1cca6b0ead924e68a44fefbfe6a935bfdee118.
---
 docs/configuration/index.md                        |   2 +-
 .../client/BrokerViewOfCoordinatorConfig.java      |  82 ++-----
 .../client/BrokerViewOfCoordinatorConfigTest.java  | 238 ---------------------
 3 files changed, 16 insertions(+), 306 deletions(-)

diff --git a/docs/configuration/index.md b/docs/configuration/index.md
index 59ad60a5be8..e309126863a 100644
--- a/docs/configuration/index.md
+++ b/docs/configuration/index.md
@@ -833,7 +833,7 @@ The following table shows the dynamic configuration 
properties for the Coordinat
 |`decommissioningNodes`|List of Historical servers to decommission. 
Coordinator will not assign new segments to decommissioning servers, and 
segments will be moved away from them to be placed on non-decommissioning 
servers at the maximum rate specified by `maxSegmentsToMove`.|none|
 |`pauseCoordination`|Boolean flag for whether or not the Coordinator should 
execute its various duties of coordinating the cluster. Setting this to true 
essentially pauses all coordination work while allowing the API to remain up. 
Duties that are paused include all classes that implement the `CoordinatorDuty` 
interface. Such duties include: segment balancing, segment compaction, 
submitting kill tasks for unused segments (if enabled), logging of used 
segments in the cluster, marking of ne [...]
 |`replicateAfterLoadTimeout`|Boolean flag for whether or not additional 
replication is needed for segments that have failed to load due to the expiry 
of `druid.coordinator.load.timeout`. If this is set to true, the Coordinator 
will attempt to replicate the failed segment on a different historical server. 
This helps improve the segment availability if there are a few slow Historicals 
in the cluster. However, the slow Historical may still load the segment later 
and the Coordinator may issu [...]
-|`turboLoadingNodes`| Experimental. List of Historical servers to place in 
turbo loading mode. These servers use a larger thread-pool to load segments 
faster but at the cost of query performance. For servers specified in 
`turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is 
ignored and the coordinator uses the value of the respective 
`numLoadingThreads` instead.<br/>While a server is in `turboLoadingNodes`, 
Brokers demote it to the lowest query priority: for any segmen [...]
+|`turboLoadingNodes`| Experimental. List of Historical servers to place in 
turbo loading mode. These servers use a larger thread-pool to load segments 
faster but at the cost of query performance. For servers specified in 
`turboLoadingNodes`, `druid.coordinator.loadqueuepeon.http.batchSize` is 
ignored and the coordinator uses the value of the respective 
`numLoadingThreads` instead.<br/>Please use this config with caution. All 
servers should eventually be removed from this list once the se [...]
 |`cloneServers`| Experimental. Map from target Historical server to source 
Historical server which should be cloned by the target. The target Historical 
does not participate in regular segment assignment or balancing. Instead, the 
Coordinator mirrors any segment assignment made to the source Historical onto 
the target Historical, so that the target becomes an exact copy of the source. 
Segments on the target Historical do not count towards replica counts either. 
If the source disappears,  [...]
 |`historicalTierAliases`|Map from a virtual tier name to the set of real 
Historical tier names it expands to. When a load/drop rule references a virtual 
alias tier, the Coordinator replaces it with its real tiers — each receiving 
the full replica count independently. The alias key itself is never loaded to 
directly. For example, `{"hot": ["hot_1", "hot_2"]}` causes a rule of `{"hot": 
2}` to load 2 replicas on each of `hot_1` and `hot_2`; `hot` receives no direct 
assignment. An alias valu [...]
 
diff --git 
a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
 
b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
index 316378116e5..f09a51a06b3 100644
--- 
a/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
+++ 
b/server/src/main/java/org/apache/druid/client/BrokerViewOfCoordinatorConfig.java
@@ -41,10 +41,9 @@ import org.apache.druid.server.BrokerDynamicConfigResource;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 
 import javax.validation.constraints.NotNull;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.stream.Collectors;
 
 /**
  * Broker view of the coordinator dynamic configuration, and its derived 
values such as target and source clone servers.
@@ -59,8 +58,6 @@ public class BrokerViewOfCoordinatorConfig extends 
BaseBrokerViewOfConfig<Coordi
   private Set<String> targetCloneServers;
   @GuardedBy("this")
   private Set<String> sourceCloneServers;
-  @GuardedBy("this")
-  private Set<String> turboLoadingNodes;
 
   @Inject
   public BrokerViewOfCoordinatorConfig(
@@ -100,7 +97,7 @@ public class BrokerViewOfCoordinatorConfig extends 
BaseBrokerViewOfConfig<Coordi
 
   /**
    * Update the config view with a new coordinator dynamic config snapshot. 
Also updates the source and target clone
-   * servers and the turbo-loading nodes based on the new dynamic 
configuration.
+   * servers based on the new dynamic configuration.
    */
   @Override
   public synchronized void setDynamicConfig(@NotNull CoordinatorDynamicConfig 
updatedConfig)
@@ -109,7 +106,6 @@ public class BrokerViewOfCoordinatorConfig extends 
BaseBrokerViewOfConfig<Coordi
     final Map<String, String> cloneServers = updatedConfig.getCloneServers();
     this.targetCloneServers = ImmutableSet.copyOf(cloneServers.keySet());
     this.sourceCloneServers = ImmutableSet.copyOf(cloneServers.values());
-    this.turboLoadingNodes = 
ImmutableSet.copyOf(updatedConfig.getTurboLoadingNodes());
   }
 
   @Override
@@ -118,70 +114,37 @@ public class BrokerViewOfCoordinatorConfig extends 
BaseBrokerViewOfConfig<Coordi
       CloneQueryMode mode
   )
   {
-    final Set<String> serversToIgnore;
-    final Set<String> turboNodes;
-    synchronized (this) {
-      serversToIgnore = getCurrentServersToIgnore(mode);
-      turboNodes = turboLoadingNodes == null ? Set.of() : turboLoadingNodes;
-    }
+    final Set<String> serversToIgnore = getCurrentServersToIgnore(mode);
 
-    if (serversToIgnore.isEmpty() && turboNodes.isEmpty()) {
+    if (serversToIgnore.isEmpty()) {
       return historicalServers;
     }
 
-    // Preserve the comparator from the input map so the tier selection 
strategy's ordering is respected.
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> filteredHistoricals =
-        new Int2ObjectRBTreeMap<>(historicalServers.comparator());
-
-    final Set<QueryableDruidServer> deprioritizedTurboHistoricals = new 
HashSet<>();
-
+    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> filteredHistoricals = 
new Int2ObjectRBTreeMap<>();
     for (int priority : historicalServers.keySet()) {
-      final Set<QueryableDruidServer> preferredHistoricals = new HashSet<>();
-      for (QueryableDruidServer server : historicalServers.get(priority)) {
-        final String host = server.getServer().getHost();
-        if (serversToIgnore.contains(host)) {
-          // Clone filtering removes the server entirely from the queryable 
set.
-          continue;
-        }
-        if (turboNodes.contains(host)) {
-          // Turbo-loading servers are demoted to a dead-last bucket so 
queries prefer
-          // non-turbo replicas, but still fall back to turbo replicas when no 
alternative exists.
-          deprioritizedTurboHistoricals.add(server);
-          continue;
-        }
-        preferredHistoricals.add(server);
-      }
-      if (!preferredHistoricals.isEmpty()) {
-        filteredHistoricals.put(priority, preferredHistoricals);
-      }
-    }
-
-    if (!deprioritizedTurboHistoricals.isEmpty()) {
-      final int deadLastPriority = 
computeDeadLastPriority(historicalServers.comparator());
-      // If a real tier already occupies MIN/MAX_VALUE, merge rather than 
overwrite.
-      filteredHistoricals.merge(deadLastPriority, 
deprioritizedTurboHistoricals, (existing, toAdd) -> {
-        final Set<QueryableDruidServer> merged = new HashSet<>(existing);
-        merged.addAll(toAdd);
-        return merged;
-      });
+      Set<QueryableDruidServer> servers = historicalServers.get(priority);
+      filteredHistoricals.put(priority,
+                              servers.stream()
+                                     .filter(server -> 
!serversToIgnore.contains(server.getServer().getHost()))
+                                     .collect(Collectors.toSet())
+      );
     }
 
     return filteredHistoricals;
   }
 
   /**
-   * Get the set of servers that should not be queried based on the 
cloneQueryMode parameter.
+   * Get the list of servers that should not be queried based on the 
cloneQueryMode parameter.
    */
-  @GuardedBy("this")
-  private Set<String> getCurrentServersToIgnore(CloneQueryMode cloneQueryMode)
+  private synchronized Set<String> getCurrentServersToIgnore(CloneQueryMode 
cloneQueryMode)
   {
     switch (cloneQueryMode) {
       case PREFERCLONES:
         // Remove servers being cloned targets, so that clones are queried.
-        return sourceCloneServers == null ? Set.of() : sourceCloneServers;
+        return sourceCloneServers;
       case EXCLUDECLONES:
         // Remove clones, so that only source servers are queried.
-        return targetCloneServers == null ? Set.of() : targetCloneServers;
+        return targetCloneServers;
       case INCLUDECLONES:
         // Don't remove either.
         return Set.of();
@@ -189,19 +152,4 @@ public class BrokerViewOfCoordinatorConfig extends 
BaseBrokerViewOfConfig<Coordi
         throw DruidException.defensive("Unexpected value of 
cloneQueryMode[%s]", cloneQueryMode);
     }
   }
-
-  /**
-   * Pick a priority value that sorts LAST under the given comparator, so that 
turbo-loading
-   * servers are only considered when no other server has the segment.
-   */
-  private static int computeDeadLastPriority(Comparator<? super Integer> cmp)
-  {
-    if (cmp == null) {
-      // Natural int ordering (ascending) → MAX_VALUE sorts last.
-      return Integer.MAX_VALUE;
-    }
-    // If the comparator puts higher values first (e.g. reverseOrder), then 
MIN_VALUE
-    // sorts last; otherwise MAX_VALUE sorts last.
-    return cmp.compare(1, 0) < 0 ? Integer.MIN_VALUE : Integer.MAX_VALUE;
-  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java
 
b/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java
index 86176888119..5a014531bf5 100644
--- 
a/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java
+++ 
b/server/src/test/java/org/apache/druid/client/BrokerViewOfCoordinatorConfigTest.java
@@ -20,22 +20,14 @@
 package org.apache.druid.client;
 
 import com.google.common.util.concurrent.Futures;
-import it.unimi.dsi.fastutil.ints.Int2ObjectRBTreeMap;
 import org.apache.druid.client.coordinator.CoordinatorClient;
-import org.apache.druid.query.CloneQueryMode;
-import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.CoordinatorDynamicConfig;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
 
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.Comparator;
 import java.util.Map;
-import java.util.Set;
 
 public class BrokerViewOfCoordinatorConfigTest
 {
@@ -63,234 +55,4 @@ public class BrokerViewOfCoordinatorConfigTest
     Mockito.verify(coordinatorClient, 
Mockito.times(1)).getCoordinatorDynamicConfig();
     Assert.assertEquals(config, target.getDynamicConfig());
   }
-
-  @Test
-  public void 
testGetQueryableServers_noFiltersOrTurbo_returnsOriginalMapReference()
-  {
-    target.setDynamicConfig(CoordinatorDynamicConfig.builder().build());
-
-    final QueryableDruidServer server = historical("h1:8080", 0);
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
-        new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
-    input.put(0, Set.of(server));
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
-        target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-
-    // Fast path: no filtering needed, return the original map reference 
unchanged.
-    Assert.assertSame(input, result);
-  }
-
-  @Test
-  public void 
testGetQueryableServers_turboNodeDemoted_highestPriorityStrategy()
-  {
-    target.setDynamicConfig(
-        CoordinatorDynamicConfig
-            .builder()
-            .withTurboLoadingNodes(Set.of("turbo:8080"))
-            .build()
-    );
-
-    final QueryableDruidServer nonTurboHighTier = 
historical("normal-high:8080", 10);
-    final QueryableDruidServer nonTurboLowTier = historical("normal-low:8080", 
5);
-    final QueryableDruidServer turbo = historical("turbo:8080", 10);
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
-        new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
-    input.put(10, Set.of(nonTurboHighTier, turbo));
-    input.put(5, Set.of(nonTurboLowTier));
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
-        target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-
-    // Expected iteration order for highest-priority-first comparator:
-    //   10 -> [nonTurboHighTier]
-    //    5 -> [nonTurboLowTier]
-    //  MIN -> [turbo] (dead last)
-    Assert.assertEquals(
-        Arrays.asList(10, 5, Integer.MIN_VALUE),
-        new ArrayList<>(result.keySet())
-    );
-    Assert.assertEquals(Set.of(nonTurboHighTier), result.get(10));
-    Assert.assertEquals(Set.of(nonTurboLowTier), result.get(5));
-    Assert.assertEquals(Set.of(turbo), result.get(Integer.MIN_VALUE));
-  }
-
-  @Test
-  public void testGetQueryableServers_turboNodeDemoted_lowestPriorityStrategy()
-  {
-    target.setDynamicConfig(
-        CoordinatorDynamicConfig
-            .builder()
-            .withTurboLoadingNodes(Set.of("turbo:8080"))
-            .build()
-    );
-
-    final QueryableDruidServer nonTurboLow = historical("normal-low:8080", 5);
-    final QueryableDruidServer nonTurboHigh = historical("normal-high:8080", 
10);
-    final QueryableDruidServer turbo = historical("turbo:8080", 5);
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
-        new Int2ObjectRBTreeMap<>(Comparator.naturalOrder());
-    input.put(5, Set.of(nonTurboLow, turbo));
-    input.put(10, Set.of(nonTurboHigh));
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
-        target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-
-    // Expected iteration order for lowest-priority-first comparator:
-    //    5 -> [nonTurboLow]
-    //   10 -> [nonTurboHigh]
-    //  MAX -> [turbo] (dead last)
-    Assert.assertEquals(
-        Arrays.asList(5, 10, Integer.MAX_VALUE),
-        new ArrayList<>(result.keySet())
-    );
-    Assert.assertEquals(Set.of(nonTurboLow), result.get(5));
-    Assert.assertEquals(Set.of(nonTurboHigh), result.get(10));
-    Assert.assertEquals(Set.of(turbo), result.get(Integer.MAX_VALUE));
-  }
-
-  @Test
-  public void testGetQueryableServers_allTurbo_stillQueryableAsFallback()
-  {
-    target.setDynamicConfig(
-        CoordinatorDynamicConfig
-            .builder()
-            .withTurboLoadingNodes(Set.of("turbo1:8080", "turbo2:8080"))
-            .build()
-    );
-
-    final QueryableDruidServer turbo1 = historical("turbo1:8080", 10);
-    final QueryableDruidServer turbo2 = historical("turbo2:8080", 5);
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
-        new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
-    input.put(10, Set.of(turbo1));
-    input.put(5, Set.of(turbo2));
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
-        target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-
-    // Original tier buckets are emptied out and removed; all turbo servers 
collapse
-    // into the dead-last bucket so queries still have somewhere to land.
-    Assert.assertEquals(
-        Collections.singletonList(Integer.MIN_VALUE),
-        new ArrayList<>(result.keySet())
-    );
-    Assert.assertEquals(Set.of(turbo1, turbo2), result.get(Integer.MIN_VALUE));
-  }
-
-  @Test
-  public void testGetQueryableServers_turboPlusCloneFilter_bothApplied()
-  {
-    target.setDynamicConfig(
-        CoordinatorDynamicConfig
-            .builder()
-            .withCloneServers(Map.of("clone:8080", "source:8080"))
-            .withTurboLoadingNodes(Set.of("turbo:8080"))
-            .build()
-    );
-
-    final QueryableDruidServer source = historical("source:8080", 0);
-    final QueryableDruidServer clone = historical("clone:8080", 0);
-    final QueryableDruidServer turbo = historical("turbo:8080", 0);
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
-        new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
-    input.put(0, Set.of(source, clone, turbo));
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
-        target.getQueryableServers(input, CloneQueryMode.EXCLUDECLONES);
-
-    // EXCLUDECLONES drops the clone entirely; turbo is demoted to dead-last.
-    Assert.assertEquals(Set.of(source), result.get(0));
-    Assert.assertEquals(Set.of(turbo), result.get(Integer.MIN_VALUE));
-    
Assert.assertFalse(result.values().stream().flatMap(Set::stream).anyMatch(s -> 
s == clone));
-  }
-
-  @Test
-  public void testGetQueryableServers_emptiedTier_priorityKeyOmitted()
-  {
-    target.setDynamicConfig(
-        CoordinatorDynamicConfig
-            .builder()
-            .withCloneServers(Map.of("clone:8080", "source:8080"))
-            .build()
-    );
-
-    final QueryableDruidServer clone = historical("clone:8080", 3);
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
-        new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
-    input.put(3, Set.of(clone));
-
-    final Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
-        target.getQueryableServers(input, CloneQueryMode.EXCLUDECLONES);
-
-    // Tier 3 was the only tier and its only server was clone-filtered; the 
priority
-    // key should be absent rather than present with an empty set.
-    Assert.assertTrue(result.isEmpty());
-  }
-
-  @Test
-  public void testSetDynamicConfig_updatesCachedTurboSet()
-  {
-    target.setDynamicConfig(
-        CoordinatorDynamicConfig
-            .builder()
-            .withTurboLoadingNodes(Set.of("turbo-v1:8080"))
-            .build()
-    );
-
-    final QueryableDruidServer serverV1 = historical("turbo-v1:8080", 0);
-    final QueryableDruidServer serverV2 = historical("turbo-v2:8080", 0);
-
-    Int2ObjectRBTreeMap<Set<QueryableDruidServer>> input =
-        new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
-    input.put(0, Set.of(serverV1, serverV2));
-
-    Int2ObjectRBTreeMap<Set<QueryableDruidServer>> result =
-        target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-    Assert.assertEquals(Set.of(serverV2), result.get(0));
-    Assert.assertEquals(Set.of(serverV1), result.get(Integer.MIN_VALUE));
-
-    // Swap which server is in turbo mode.
-    target.setDynamicConfig(
-        CoordinatorDynamicConfig.builder()
-                                .withTurboLoadingNodes(Set.of("turbo-v2:8080"))
-                                .build()
-    );
-
-    input = new Int2ObjectRBTreeMap<>(Comparator.reverseOrder());
-    input.put(0, Set.of(serverV1, serverV2));
-
-    result = target.getQueryableServers(input, CloneQueryMode.INCLUDECLONES);
-    Assert.assertEquals(Set.of(serverV1), result.get(0));
-    Assert.assertEquals(Set.of(serverV2), result.get(Integer.MIN_VALUE));
-  }
-
-  /**
-   * Build a {@link QueryableDruidServer} representing a historical at the 
given host and priority.
-   * The {@code hostAndPort} string doubles as the server name — the broker's 
turbo and clone
-   * lookups both key off {@link DruidServer#getHost()}, so tests need the 
host string under
-   * their control. The query runner is left null since these tests never 
dispatch queries;
-   * they only exercise the routing/filtering logic in
-   * {@link BrokerViewOfCoordinatorConfig#getQueryableServers}.
-   */
-  private static QueryableDruidServer historical(String hostAndPort, int 
priority)
-  {
-    final DruidServer server = new DruidServer(
-        hostAndPort,
-        hostAndPort,
-        null,
-        0L,
-        null,
-        ServerType.HISTORICAL,
-        DruidServer.DEFAULT_TIER,
-        priority
-    );
-    return new QueryableDruidServer(server, null);
-  }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to