This is an automated email from the ASF dual-hosted git repository.

clintropolis pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/master by this push:
     new 8d526bd0b6d feat: add partial load rule types (#19374)
8d526bd0b6d is described below

commit 8d526bd0b6dd5169783e0e6e071c2f079573999d
Author: Clint Wylie <[email protected]>
AuthorDate: Wed Apr 29 16:27:16 2026 -0700

    feat: add partial load rule types (#19374)
    
    changes:
    * adds `PartialLoadRule` abstract class to capture load rules that should 
partially load a segment on some tier
    * adds `IntervalPartialLoadRule, `ForeverPartialLoadRule`, and 
`PeriodPartialLoadRule` implementations to mirror non-partial load rules
    * adds `PartialLoadMatcher` interface to match and select what to partial 
load
    * adds `ExactProjectionPartialLoadMatcher` and 
`WildcardProjectionPartialLoadMatcher` to do partial loading of projections
    * adds `CannotMatchBehavior` enum to describe behavior of `PartialLoadRule` 
when `PartialLoadMatcher` is unable to match a segment
    * since partial loading is not available yet, partial rules function as 
regular load rules until follow-up work
    * tests
---
 .../coordinator/rules/CannotMatchBehavior.java     |  81 ++++
 .../rules/ExactProjectionPartialLoadMatcher.java   | 100 +++++
 .../coordinator/rules/ForeverPartialLoadRule.java  |  61 +++
 .../coordinator/rules/IntervalPartialLoadRule.java |  94 +++++
 .../coordinator/rules/PartialLoadMatcher.java      |  60 +++
 .../server/coordinator/rules/PartialLoadRule.java  | 119 ++++++
 .../coordinator/rules/PeriodPartialLoadRule.java   | 104 +++++
 .../rules/ProjectionPartialLoadMatcher.java        |  83 ++++
 .../druid/server/coordinator/rules/Rule.java       |  20 +
 .../rules/UnknownPartialLoadMatcher.java           |  78 ++++
 .../WildcardProjectionPartialLoadMatcher.java      | 218 ++++++++++
 .../druid/server/http/DataSourcesResource.java     |  36 +-
 .../ExactProjectionPartialLoadMatcherTest.java     | 169 ++++++++
 .../coordinator/rules/PartialLoadMatcherTest.java  | 145 +++++++
 .../coordinator/rules/PartialLoadRuleTest.java     | 389 ++++++++++++++++++
 .../WildcardProjectionPartialLoadMatcherTest.java  | 454 +++++++++++++++++++++
 .../druid/server/http/DataSourcesResourceTest.java | 230 ++++++++++-
 17 files changed, 2439 insertions(+), 2 deletions(-)

diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/CannotMatchBehavior.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/CannotMatchBehavior.java
new file mode 100644
index 00000000000..4da44b7361c
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/CannotMatchBehavior.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonValue;
+
+import javax.annotation.Nullable;
+
+/**
+ * Controls what happens when a {@link PartialLoadRule}'s {@link 
PartialLoadMatcher} does not apply to a given segment,
+ * for example, a {@link ProjectionPartialLoadMatcher} when faced with a 
segment that doesn't have projections.
+ * <p>
+ * Unknown values deserialize to {@code null} so that an older coordinator 
encountering a rule authored on a newer
+ * version that introduces a new behavior falls back to the constructor's 
default ({@link #FULL_LOAD}) rather than
+ * failing to parse the rule.
+ */
+public enum CannotMatchBehavior
+{
+  /**
+   * The rule does not apply; the cascade continues to the next rule.
+   */
+  FALL_THROUGH("fallThrough"),
+
+  /**
+   * The rule applies and the segment is loaded in full on this tier.
+   */
+  FULL_LOAD("fullLoad");
+
+  private final String id;
+
+  CannotMatchBehavior(String id)
+  {
+    this.id = id;
+  }
+
+  @JsonCreator
+  @Nullable
+  public static CannotMatchBehavior fromString(final String id)
+  {
+    if (id == null) {
+      return null;
+    }
+    for (CannotMatchBehavior behavior : values()) {
+      if (behavior.id.equals(id)) {
+        return behavior;
+      }
+    }
+
+    return null;
+  }
+
+  @JsonValue
+  public String getId()
+  {
+    return id;
+  }
+
+  @Override
+  public String toString()
+  {
+    return id;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/ExactProjectionPartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/ExactProjectionPartialLoadMatcher.java
new file mode 100644
index 00000000000..4526db3a6a3
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/ExactProjectionPartialLoadMatcher.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.timeline.DataSegment;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeSet;
+
+/**
+ * Selects partial load of projections whose names appear in the configured 
{@link #names} list.
+ */
+public class ExactProjectionPartialLoadMatcher extends 
ProjectionPartialLoadMatcher
+{
+  public static final String TYPE = "exactProjection";
+
+  private final List<String> names;
+
+  @JsonCreator
+  public ExactProjectionPartialLoadMatcher(@JsonProperty("names") List<String> 
names)
+  {
+    if (names == null || names.isEmpty()) {
+      throw InvalidInput.exception("names must not be null or empty for 
exactProjection matcher");
+    }
+    this.names = List.copyOf(names);
+  }
+
+  @JsonProperty
+  public List<String> getNames()
+  {
+    return names;
+  }
+
+  @Override
+  protected List<String> resolveProjectionNames(DataSegment segment)
+  {
+    final List<String> segmentProjections = segment.getProjections();
+    if (segmentProjections == null || segmentProjections.isEmpty()) {
+      return Collections.emptyList();
+    }
+    final Set<String> present = new HashSet<>(segmentProjections);
+    final TreeSet<String> intersected = new TreeSet<>();
+    for (String name : names) {
+      if (present.contains(name)) {
+        intersected.add(name);
+      }
+    }
+    return new ArrayList<>(intersected);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ExactProjectionPartialLoadMatcher that = 
(ExactProjectionPartialLoadMatcher) o;
+    return Objects.equals(names, that.names);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(names);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "ExactProjectionPartialLoadMatcher{names=" + names + "}";
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverPartialLoadRule.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverPartialLoadRule.java
new file mode 100644
index 00000000000..c11873cc941
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/ForeverPartialLoadRule.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Forever variant of {@link PartialLoadRule}. Mirrors {@link ForeverLoadRule} 
(always load) and layers on
+ * {@link PartialLoadMatcher} for selection.
+ */
+public class ForeverPartialLoadRule extends PartialLoadRule
+{
+  public static final String TYPE = "loadPartialForever";
+
+  @JsonCreator
+  public ForeverPartialLoadRule(
+      @JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants,
+      @JsonProperty("useDefaultTierForNull") @Nullable Boolean 
useDefaultTierForNull,
+      @JsonProperty("matcher") PartialLoadMatcher matcher,
+      @JsonProperty("onCannotMatch") @Nullable CannotMatchBehavior 
onCannotMatch
+  )
+  {
+    super(tieredReplicants, useDefaultTierForNull, matcher, onCannotMatch);
+  }
+
+  @Override
+  @JsonProperty
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @Override
+  public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
+  {
+    return true;
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalPartialLoadRule.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalPartialLoadRule.java
new file mode 100644
index 00000000000..3df00418bac
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/IntervalPartialLoadRule.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Interval-based variant of {@link PartialLoadRule}. Mirrors {@link 
IntervalLoadRule} for time-window semantics and
+ * layers on {@link PartialLoadMatcher} for selection.
+ */
+public class IntervalPartialLoadRule extends PartialLoadRule
+{
+  public static final String TYPE = "loadPartialByInterval";
+
+  private final Interval interval;
+
+  @JsonCreator
+  public IntervalPartialLoadRule(
+      @JsonProperty("interval") Interval interval,
+      @JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants,
+      @JsonProperty("useDefaultTierForNull") @Nullable Boolean 
useDefaultTierForNull,
+      @JsonProperty("matcher") PartialLoadMatcher matcher,
+      @JsonProperty("onCannotMatch") @Nullable CannotMatchBehavior 
onCannotMatch
+  )
+  {
+    super(tieredReplicants, useDefaultTierForNull, matcher, onCannotMatch);
+    this.interval = interval;
+  }
+
+  @Override
+  @JsonProperty
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @JsonProperty
+  public Interval getInterval()
+  {
+    return interval;
+  }
+
+  @Override
+  public boolean appliesTo(Interval theInterval, DateTime referenceTimestamp)
+  {
+    return Rules.eligibleForLoad(interval, theInterval);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    IntervalPartialLoadRule that = (IntervalPartialLoadRule) o;
+    return Objects.equals(interval, that.interval);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), interval);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
new file mode 100644
index 00000000000..fd7cfc3cc5b
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcher.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonSubTypes;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+import org.apache.druid.segment.loading.LoadSpec;
+import org.apache.druid.server.coordination.SegmentChangeRequestLoad;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+
+/**
+ * Decides whether a {@link PartialLoadRule} should partially load a given 
segment and, when it should, produces the
+ * wire-form load-spec wrapper plus a fingerprint identifying that request. 
Implementations encapsulate both the
+ * configuration that drives the decision and the wire format of their 
corresponding {@link LoadSpec} wrapper, so the
+ * rule layer stays scheme-agnostic.
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", defaultImpl = 
UnknownPartialLoadMatcher.class)
+@JsonSubTypes({
+    @JsonSubTypes.Type(name = ExactProjectionPartialLoadMatcher.TYPE, value = 
ExactProjectionPartialLoadMatcher.class),
+    @JsonSubTypes.Type(name = WildcardProjectionPartialLoadMatcher.TYPE, value 
= WildcardProjectionPartialLoadMatcher.class)
+})
+public interface PartialLoadMatcher
+{
+  /**
+   * Returns the {@link MatchResult} this matcher produces for the given 
segment, or null if the matcher does not apply
+   * to the segment. When null, {@link PartialLoadRule} consults {@link 
CannotMatchBehavior} to decide whether the rule
+   * falls through or full-loads.
+   */
+  @Nullable
+  MatchResult match(DataSegment segment, Map<String, Object> baseLoadSpec);
+
+  /**
+   * Output of {@link #match(DataSegment, Map)} when the matcher applies. 
Carries the wrapped load-spec map (ready to
+   * be stamped onto an outbound {@link SegmentChangeRequestLoad}) and the 
fingerprint used by the coordinator to
+   * reconcile loaded replicas against the rule that requested them.
+   */
+  record MatchResult(Map<String, Object> wrappedLoadSpec, String fingerprint)
+  {
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
new file mode 100644
index 00000000000..052389d72f4
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PartialLoadRule.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.common.config.Configs;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.timeline.DataSegment;
+import org.joda.time.DateTime;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Base class for rules that load only a subset of a segment on a tier. Pairs 
a {@link PartialLoadMatcher} (which
+ * produces the wrapped load-spec wire form and an accounting fingerprint when 
it applies to a segment) with a
+ * {@link CannotMatchBehavior} that controls whether the rule falls through or 
full-loads when the matcher does not
+ * apply.
+ */
+public abstract class PartialLoadRule extends LoadRule
+{
+  private final PartialLoadMatcher matcher;
+  private final CannotMatchBehavior onCannotMatch;
+
+  protected PartialLoadRule(
+      Map<String, Integer> tieredReplicants,
+      @Nullable Boolean useDefaultTierForNull,
+      PartialLoadMatcher matcher,
+      @Nullable CannotMatchBehavior onCannotMatch
+  )
+  {
+    super(tieredReplicants, useDefaultTierForNull);
+    if (matcher == null) {
+      throw InvalidInput.exception("matcher must not be null for a partial 
load rule");
+    }
+    this.matcher = matcher;
+    this.onCannotMatch = Configs.valueOrDefault(onCannotMatch, 
CannotMatchBehavior.FULL_LOAD);
+  }
+
+  @JsonProperty
+  public PartialLoadMatcher getMatcher()
+  {
+    return matcher;
+  }
+
+  @JsonProperty
+  public CannotMatchBehavior getOnCannotMatch()
+  {
+    return onCannotMatch;
+  }
+
+  @Override
+  public boolean isIntervalBased()
+  {
+    return false;
+  }
+
+  @Override
+  public boolean appliesTo(DataSegment segment, DateTime referenceTimestamp)
+  {
+    if (!appliesTo(segment.getInterval(), referenceTimestamp)) {
+      return false;
+    }
+    final PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    if (result != null) {
+      return true;
+    }
+    return onCannotMatch == CannotMatchBehavior.FULL_LOAD;
+  }
+
+  @Override
+  public void run(DataSegment segment, SegmentActionHandler handler)
+  {
+    // Partial plumbing is added in future work. For now, a partial rule that 
applies to a segment full-loads it,
+    // identical behavior to the corresponding non-partial rule
+    handler.replicateSegment(segment, getTieredReplicants());
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    PartialLoadRule that = (PartialLoadRule) o;
+    return onCannotMatch == that.onCannotMatch
+        && Objects.equals(matcher, that.matcher);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), matcher, onCannotMatch);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodPartialLoadRule.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodPartialLoadRule.java
new file mode 100644
index 00000000000..db8d17c7745
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/PeriodPartialLoadRule.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * Period-based variant of {@link PartialLoadRule}. Mirrors {@link 
PeriodLoadRule} for time-window semantics and layers
+ * on {@link PartialLoadMatcher} for selection.
+ */
+public class PeriodPartialLoadRule extends PartialLoadRule
+{
+  public static final String TYPE = "loadPartialByPeriod";
+
+  private final Period period;
+  private final boolean includeFuture;
+
+  @JsonCreator
+  public PeriodPartialLoadRule(
+      @JsonProperty("period") Period period,
+      @JsonProperty("includeFuture") @Nullable Boolean includeFuture,
+      @JsonProperty("tieredReplicants") Map<String, Integer> tieredReplicants,
+      @JsonProperty("useDefaultTierForNull") @Nullable Boolean 
useDefaultTierForNull,
+      @JsonProperty("matcher") PartialLoadMatcher matcher,
+      @JsonProperty("onCannotMatch") @Nullable CannotMatchBehavior 
onCannotMatch
+  )
+  {
+    super(tieredReplicants, useDefaultTierForNull, matcher, onCannotMatch);
+    this.period = period;
+    this.includeFuture = includeFuture == null ? 
PeriodLoadRule.DEFAULT_INCLUDE_FUTURE : includeFuture;
+  }
+
+  @Override
+  @JsonProperty
+  public String getType()
+  {
+    return TYPE;
+  }
+
+  @JsonProperty
+  public Period getPeriod()
+  {
+    return period;
+  }
+
+  @JsonProperty
+  public boolean isIncludeFuture()
+  {
+    return includeFuture;
+  }
+
+  @Override
+  public boolean appliesTo(Interval interval, DateTime referenceTimestamp)
+  {
+    return Rules.eligibleForLoad(period, interval, referenceTimestamp, 
includeFuture);
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    if (!super.equals(o)) {
+      return false;
+    }
+    PeriodPartialLoadRule that = (PeriodPartialLoadRule) o;
+    return includeFuture == that.includeFuture && Objects.equals(period, 
that.period);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(super.hashCode(), period, includeFuture);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/ProjectionPartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/ProjectionPartialLoadMatcher.java
new file mode 100644
index 00000000000..04851a0b865
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/ProjectionPartialLoadMatcher.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.google.common.hash.Hasher;
+import com.google.common.hash.Hashing;
+import com.google.common.io.BaseEncoding;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * Base for {@link PartialLoadMatcher} implementations that decide which of a 
segment's V10 projections to load.
+ * Subclasses supply the resolution policy via {@link 
#resolveProjectionNames(DataSegment)}; this base handles
+ * fingerprint computation and wraps the result into the {@code 
partialProjection} load-spec wire form consumed
+ * by the historical-side {@code PartialProjectionLoadSpec}.
+ * <p>
+ * The fingerprint is a hash of what projections are partially loaded on a 
segment by this rule; the data node will
+ * include this value in the segment announcement so that it can be used as a 
lightweight value to compare against
+ * to handle things like rule change so that we can ensure that the 'right' 
partial load is in place from run to run.
+ */
+public abstract class ProjectionPartialLoadMatcher implements 
PartialLoadMatcher
+{
+  static final String LOAD_SPEC_TYPE = "partialProjection";
+  static final String FINGERPRINT_VERSION = "v1";
+
+  /**
+   * Returns the sorted, deduped list of projection names from {@link 
DataSegment#getProjections()} that this matcher
+   * selects. Returns an empty list when nothing matches (the segment exposes 
no projections, or no configured pattern
+   * intersects what the segment has).
+   */
+  protected abstract List<String> resolveProjectionNames(DataSegment segment);
+
+  @Override
+  @Nullable
+  public MatchResult match(DataSegment segment, Map<String, Object> 
baseLoadSpec)
+  {
+    final List<String> resolved = resolveProjectionNames(segment);
+    if (resolved.isEmpty()) {
+      return null;
+    }
+    final String fingerprint = computeFingerprint(resolved);
+    final Map<String, Object> wrapped = Map.of(
+        "type", LOAD_SPEC_TYPE,
+        "delegate", baseLoadSpec,
+        "projections", resolved,
+        "fingerprint", fingerprint
+    );
+    return new MatchResult(wrapped, fingerprint);
+  }
+
+  static String computeFingerprint(List<String> sortedDedupedNames)
+  {
+    final Hasher hasher = Hashing.sha256().newHasher();
+    for (String name : sortedDedupedNames) {
+      hasher.putUnencodedChars(name);
+      hasher.putByte((byte) 0);
+    }
+    final String hex = 
BaseEncoding.base16().encode(hasher.hash().asBytes()).toLowerCase(Locale.ROOT);
+    // should be good enough without dragging the whole thing around for every 
segment
+    return FINGERPRINT_VERSION + ":" + hex.substring(0, 16);
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
index a66101d0fe1..0b9988c4ce7 100644
--- a/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
+++ b/server/src/main/java/org/apache/druid/server/coordinator/rules/Rule.java
@@ -19,6 +19,7 @@
 
 package org.apache.druid.server.coordinator.rules;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonSubTypes;
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 import org.apache.druid.timeline.DataSegment;
@@ -33,6 +34,9 @@ import org.joda.time.Interval;
     @JsonSubTypes.Type(name = "loadByPeriod", value = PeriodLoadRule.class),
     @JsonSubTypes.Type(name = "loadByInterval", value = 
IntervalLoadRule.class),
     @JsonSubTypes.Type(name = "loadForever", value = ForeverLoadRule.class),
+    @JsonSubTypes.Type(name = PeriodPartialLoadRule.TYPE, value = 
PeriodPartialLoadRule.class),
+    @JsonSubTypes.Type(name = IntervalPartialLoadRule.TYPE, value = 
IntervalPartialLoadRule.class),
+    @JsonSubTypes.Type(name = ForeverPartialLoadRule.TYPE, value = 
ForeverPartialLoadRule.class),
     @JsonSubTypes.Type(name = "dropByPeriod", value = PeriodDropRule.class),
     @JsonSubTypes.Type(name = "dropBeforeByPeriod", value = 
PeriodDropBeforeRule.class),
     @JsonSubTypes.Type(name = "dropByInterval", value = 
IntervalDropRule.class),
@@ -49,5 +53,21 @@ public interface Rule
 
   boolean appliesTo(Interval interval, DateTime referenceTimestamp);
 
+  /**
+   * @return Whether {@link #appliesTo(DataSegment, DateTime)} is equivalent 
to {@link #appliesTo(Interval, DateTime)}
+   * for this rule, i.e. the rule's applicability decision depends only on the 
segment's interval and not on any other
+   * segment-specific information. Defaults to {@code true}; subclasses that 
consult the segment beyond its interval
+   * (such as {@link PartialLoadRule} which inspects the segment to decide 
what to load) override to return
+   * {@code false}.
+   * <p>
+   * Callers that only have an interval available can use this to safely use 
the cheaper interval-only path when every
+   * rule in the cascade is interval-based.
+   */
+  @JsonIgnore
+  default boolean isIntervalBased()
+  {
+    return true;
+  }
+
   void run(DataSegment segment, SegmentActionHandler segmentHandler);
 }
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/UnknownPartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/UnknownPartialLoadMatcher.java
new file mode 100644
index 00000000000..0ca2169196a
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/UnknownPartialLoadMatcher.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import org.apache.druid.java.util.common.logger.Logger;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Fallback {@link PartialLoadMatcher} used as the {@code defaultImpl} so that 
older Druid versions can still parse a
+ * rule whose matcher type was introduced in a newer version. {@link #match} 
always returns {@code null} (matcher does
+ * not apply); the rule then defers to its configured {@link 
CannotMatchBehavior}, so whether the segment falls through
+ * the cascade or gets full-loaded on this tier is up to the rule author. A 
warning is logged once per instance
+ * lifetime to surface the configuration mismatch without flooding the logs 
across coordinator passes.
+ * <p>
+ * Note: this matcher is not lossless to round-trip. An older coordinator that 
reads a rule with an unknown matcher
+ * type and then re-serializes it will not preserve the original {@code type} 
discriminator or any matcher-specific
+ * configuration. The expected operational pattern is to upgrade the 
coordinator to a version that recognizes the
+ * matcher rather than rely on round-trip.
+ */
+public class UnknownPartialLoadMatcher implements PartialLoadMatcher
+{
+  private static final Logger log = new 
Logger(UnknownPartialLoadMatcher.class);
+
+  private final AtomicBoolean warned = new AtomicBoolean(false);
+
+  @Override
+  @Nullable
+  public MatchResult match(DataSegment segment, Map<String, Object> 
baseLoadSpec)
+  {
+    if (warned.compareAndSet(false, true)) {
+      log.warn(
+          "Encountered an unknown PartialLoadMatcher type in a partial load 
rule. The matcher will be treated as"
+          + " not applicable; the rule's onCannotMatch behavior determines the 
outcome. Upgrade Druid to a version"
+          + " that supports this matcher."
+      );
+    }
+    return null;
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    return o instanceof UnknownPartialLoadMatcher;
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return UnknownPartialLoadMatcher.class.hashCode();
+  }
+
+  @Override
+  public String toString()
+  {
+    return "UnknownPartialLoadMatcher{}";
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java
 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java
new file mode 100644
index 00000000000..934eaf6f958
--- /dev/null
+++ 
b/server/src/main/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcher.java
@@ -0,0 +1,218 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.druid.error.InvalidInput;
+import org.apache.druid.timeline.DataSegment;
+
+import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.TreeSet;
+import java.util.regex.Pattern;
+
+/**
+ * Selects projections whose names match any of the configured glob patterns, 
minus any names matching an entry in
+ * {@code excludePatterns}. Supported glob metacharacters:
+ * <ul>
+ *   <li>{@code *} — any sequence of characters (including empty)</li>
+ *   <li>{@code ?} — any single character</li>
+ *   <li>{@code \} — escapes the following character so it is treated 
literally; use {@code \*}, {@code \?}, or
+ *       {@code \\} to match a literal {@code *}, {@code ?}, or {@code \}. A 
trailing unescaped {@code \} is
+ *       rejected at construction.</li>
+ * </ul>
+ * All other characters are literal; regex metacharacters are escaped 
automatically. A literal projection name is
+ * a valid (zero-wildcard) glob, so the same {@code excludePatterns} field 
covers both "exclude this specific name"
+ * and "exclude anything matching this pattern."
+ * <p>
+ * For example, a long-retention rule {@code patterns=["user_*"], 
excludePatterns=["user_daily"]} keeps every
+ * {@code user_*} projection except {@code user_daily} (which is expected to 
live on a shorter-retention rule). A
+ * broad rule {@code patterns=["*"], excludePatterns=["user_*"]} loads every 
projection except those handled by a
+ * more specific {@code user_*} rule elsewhere in the cascade.
+ */
+public class WildcardProjectionPartialLoadMatcher extends 
ProjectionPartialLoadMatcher
+{
+  public static final String TYPE = "globProjection";
+
+  private final List<String> patterns;
+  private final List<String> excludePatterns;
+  private final List<Pattern> compiledPatterns;
+  private final List<Pattern> compiledExcludePatterns;
+
+  @JsonCreator
+  public WildcardProjectionPartialLoadMatcher(
+      @JsonProperty("patterns") List<String> patterns,
+      @JsonProperty("excludePatterns") @Nullable List<String> excludePatterns
+  )
+  {
+    if (patterns == null || patterns.isEmpty()) {
+      throw InvalidInput.exception("patterns must not be null or empty for 
globProjection matcher");
+    }
+    this.patterns = List.copyOf(patterns);
+    this.excludePatterns = excludePatterns == null ? List.of() : 
List.copyOf(excludePatterns);
+    this.compiledPatterns = compileAll(this.patterns);
+    this.compiledExcludePatterns = compileAll(this.excludePatterns);
+  }
+
+  @JsonProperty
+  public List<String> getPatterns()
+  {
+    return patterns;
+  }
+
+  @JsonProperty
+  @JsonInclude(JsonInclude.Include.NON_EMPTY)
+  public List<String> getExcludePatterns()
+  {
+    return excludePatterns;
+  }
+
+  @Override
+  protected List<String> resolveProjectionNames(DataSegment segment)
+  {
+    final List<String> segmentProjections = segment.getProjections();
+    if (segmentProjections == null || segmentProjections.isEmpty()) {
+      return Collections.emptyList();
+    }
+    final TreeSet<String> matched = new TreeSet<>();
+    for (String name : segmentProjections) {
+      if (matchesAny(name, compiledExcludePatterns)) {
+        continue;
+      }
+      if (matchesAny(name, compiledPatterns)) {
+        matched.add(name);
+      }
+    }
+    return new ArrayList<>(matched);
+  }
+
+  private static List<Pattern> compileAll(List<String> globs)
+  {
+    if (globs.isEmpty()) {
+      return List.of();
+    }
+    final List<Pattern> compiled = new ArrayList<>(globs.size());
+    for (String glob : globs) {
+      compiled.add(Pattern.compile(globToRegex(glob)));
+    }
+    return List.copyOf(compiled);
+  }
+
+  private static boolean matchesAny(String name, List<Pattern> patterns)
+  {
+    for (Pattern pattern : patterns) {
+      if (pattern.matcher(name).matches()) {
+        return true;
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Translates a glob pattern with {@code *}, {@code ?}, and {@code \} escape 
semantics into an equivalent regex
+   * pattern that matches the entire input string. Regex metacharacters in 
literal positions are escaped.
+   *
+   * @throws org.apache.druid.error.DruidException if {@code glob} ends with 
an unescaped backslash
+   */
+  static String globToRegex(String glob)
+  {
+    final StringBuilder sb = new StringBuilder(glob.length() + 4);
+    boolean escaping = false;
+    for (int i = 0; i < glob.length(); i++) {
+      final char c = glob.charAt(i);
+      if (escaping) {
+        appendLiteral(sb, c);
+        escaping = false;
+        continue;
+      }
+      switch (c) {
+        case '\\':
+          escaping = true;
+          break;
+        case '*':
+          sb.append(".*");
+          break;
+        case '?':
+          sb.append('.');
+          break;
+        default:
+          appendLiteral(sb, c);
+      }
+    }
+    if (escaping) {
+      throw InvalidInput.exception("Glob pattern [%s] ends with an unescaped 
backslash", glob);
+    }
+    return sb.toString();
+  }
+
+  private static void appendLiteral(StringBuilder sb, char c)
+  {
+    switch (c) {
+      case '.':
+      case '(':
+      case ')':
+      case '[':
+      case ']':
+      case '{':
+      case '}':
+      case '+':
+      case '|':
+      case '^':
+      case '$':
+      case '\\':
+      case '*':
+      case '?':
+        sb.append('\\').append(c);
+        break;
+      default:
+        sb.append(c);
+    }
+  }
+
+  @Override
+  public boolean equals(Object o)
+  {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    WildcardProjectionPartialLoadMatcher that = 
(WildcardProjectionPartialLoadMatcher) o;
+    return Objects.equals(patterns, that.patterns) && 
Objects.equals(excludePatterns, that.excludePatterns);
+  }
+
+  @Override
+  public int hashCode()
+  {
+    return Objects.hash(patterns, excludePatterns);
+  }
+
+  @Override
+  public String toString()
+  {
+    return "WildcardProjectionPartialLoadMatcher{patterns=" + patterns + ", 
excludePatterns=" + excludePatterns + "}";
+  }
+}
diff --git 
a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java 
b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
index 3fdf3eee9c7..5a3f5decf13 100644
--- a/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
+++ b/server/src/main/java/org/apache/druid/server/http/DataSourcesResource.java
@@ -19,6 +19,8 @@
 
 package org.apache.druid.server.http;
 
+import com.google.common.base.Supplier;
+import com.google.common.base.Suppliers;
 import com.google.common.base.Throwables;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
@@ -918,11 +920,33 @@ public class DataSourcesResource
       final Interval theInterval = Intervals.of(interval);
       final SegmentDescriptor descriptor = new SegmentDescriptor(theInterval, 
version, partitionNumber);
       final DateTime now = DateTimes.nowUtc();
+      // Walk the cascade. Interval-based rules (most rules) can be evaluated 
from the interval alone; segment-aware
+      // rules (e.g., PartialLoadRule whose matcher inspects projections) 
require the actual segment, which we look
+      // up from metadata lazily and only if a segment-aware rule shows up. 
The segment lookup uses the recent cached
+      // snapshot first, then a forced refresh on miss to disambiguate "stale 
cache" from "never published."
+      final SegmentId segmentId = SegmentId.of(dataSourceName, theInterval, 
version, partitionNumber);
+      final Supplier<DataSegment> segmentSupplier = Suppliers.memoize(
+          () -> {
+            DataSegment s = 
lookupSegment(segmentsMetadataManager.getRecentDataSourcesSnapshot(), 
segmentId);
+            return s != null ? s : 
lookupSegment(segmentsMetadataManager.forceUpdateDataSourcesSnapshot(), 
segmentId);
+          }
+      );
 
       // A segment that is not eligible for load will never be handed off
       boolean eligibleForLoad = false;
       for (Rule rule : rules) {
-        if (rule.appliesTo(theInterval, now)) {
+        final boolean applies;
+        if (rule.isIntervalBased()) {
+          applies = rule.appliesTo(theInterval, now);
+        } else {
+          final DataSegment segment = segmentSupplier.get();
+          // Segment isn't published in metadata (and a forced refresh didn't 
find it); it will never be handed off.
+          if (segment == null) {
+            return Response.ok(true).build();
+          }
+          applies = rule.appliesTo(segment, now);
+        }
+        if (applies) {
           eligibleForLoad = rule instanceof LoadRule && ((LoadRule) 
rule).shouldMatchingSegmentBeLoaded();
           break;
         }
@@ -961,6 +985,16 @@ public class DataSourcesResource
     }
   }
 
+  @Nullable
+  private static DataSegment lookupSegment(@Nullable DataSourcesSnapshot 
snapshot, SegmentId segmentId)
+  {
+    if (snapshot == null) {
+      return null;
+    }
+    final ImmutableDruidDataSource ds = 
snapshot.getDataSource(segmentId.getDataSource());
+    return ds == null ? null : ds.getSegment(segmentId);
+  }
+
   static boolean isSegmentLoaded(Iterable<ImmutableSegmentLoadInfo> 
servedSegments, SegmentDescriptor descriptor)
   {
     for (ImmutableSegmentLoadInfo segmentLoadInfo : servedSegments) {
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/ExactProjectionPartialLoadMatcherTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/ExactProjectionPartialLoadMatcherTest.java
new file mode 100644
index 00000000000..ade05388eb1
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/ExactProjectionPartialLoadMatcherTest.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class ExactProjectionPartialLoadMatcherTest
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+
+  private static final Map<String, Object> BASE_LOAD_SPEC = Map.of(
+      "type", "local",
+      "path", "/var/druid/segments/foo"
+  );
+
+  private static final DataSegment.Builder BUILDER = DataSegment
+      .builder(
+          SegmentId.of(
+              "test",
+              Intervals.of("2024-01-01/2024-02-01"),
+              DateTimes.nowUtc().toString(),
+              new NumberedShardSpec(0, 0)
+          )
+      )
+      .loadSpec(BASE_LOAD_SPEC)
+      .size(0);
+
+  @Test
+  void testMatchProducesResultWhenIntersectionNonEmpty()
+  {
+    ExactProjectionPartialLoadMatcher matcher = new 
ExactProjectionPartialLoadMatcher(
+        List.of("c", "a", "x")
+    );
+    DataSegment segment = segmentWithProjections(List.of("a", "b", "c"));
+
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+
+    Map<String, Object> wrapped = result.wrappedLoadSpec();
+    Assertions.assertEquals("partialProjection", wrapped.get("type"));
+    Assertions.assertEquals(BASE_LOAD_SPEC, wrapped.get("delegate"));
+    Assertions.assertEquals(List.of("a", "c"), wrapped.get("projections"));
+    Assertions.assertEquals(result.fingerprint(), wrapped.get("fingerprint"));
+    Assertions.assertTrue(result.fingerprint().startsWith("v1:"));
+    Assertions.assertEquals("v1:".length() + 16, 
result.fingerprint().length());
+  }
+
+  @Test
+  void testMatchReturnsNullWhenNoIntersection()
+  {
+    ExactProjectionPartialLoadMatcher matcher = new 
ExactProjectionPartialLoadMatcher(
+        List.of("x", "y")
+    );
+    DataSegment segment = segmentWithProjections(List.of("a", "b"));
+    Assertions.assertNull(matcher.match(segment, segment.getLoadSpec()));
+  }
+
+  @Test
+  void testMatchReturnsNullForProjectionAgnosticSegment()
+  {
+    ExactProjectionPartialLoadMatcher matcher = new 
ExactProjectionPartialLoadMatcher(
+        List.of("a")
+    );
+    DataSegment segment = segmentWithProjections(null);
+    Assertions.assertNull(matcher.match(segment, segment.getLoadSpec()));
+  }
+
+  @Test
+  void testMatchReturnsNullForEmptyProjectionsList()
+  {
+    ExactProjectionPartialLoadMatcher matcher = new 
ExactProjectionPartialLoadMatcher(
+        List.of("a")
+    );
+    DataSegment segment = segmentWithProjections(Collections.emptyList());
+    Assertions.assertNull(matcher.match(segment, segment.getLoadSpec()));
+  }
+
+  @Test
+  void testConstructorRejectsNullNames()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new ExactProjectionPartialLoadMatcher(null)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageContains("names must 
not be null or empty")
+    );
+  }
+
+  @Test
+  void testConstructorRejectsEmptyNames()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new 
ExactProjectionPartialLoadMatcher(Collections.emptyList())
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageContains("names must 
not be null or empty")
+    );
+  }
+
+  @Test
+  void testMatchSortsAndDeduplicates()
+  {
+    ExactProjectionPartialLoadMatcher matcher = new 
ExactProjectionPartialLoadMatcher(
+        List.of("c", "a", "a", "b")
+    );
+    DataSegment segment = segmentWithProjections(List.of("a", "b", "c"));
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of("a", "b", "c"), 
result.wrappedLoadSpec().get("projections"));
+  }
+
+  @Test
+  void testSerde() throws Exception
+  {
+    ExactProjectionPartialLoadMatcher matcher = new 
ExactProjectionPartialLoadMatcher(
+        List.of("a", "b")
+    );
+    String json = OBJECT_MAPPER.writeValueAsString(matcher);
+    PartialLoadMatcher reread = OBJECT_MAPPER.readValue(json, 
PartialLoadMatcher.class);
+    Assertions.assertEquals(matcher, reread);
+    Assertions.assertInstanceOf(ExactProjectionPartialLoadMatcher.class, 
reread);
+  }
+
+  @Test
+  void testEquals()
+  {
+    
EqualsVerifier.forClass(ExactProjectionPartialLoadMatcher.class).usingGetClass().verify();
+  }
+
+  private static DataSegment segmentWithProjections(List<String> projections)
+  {
+    return BUILDER.projections(projections).build();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcherTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcherTest.java
new file mode 100644
index 00000000000..9f861a9d3e4
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadMatcherTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Cross-cutting tests for the {@link PartialLoadMatcher} interface itself: 
fingerprint stability across matcher
+ * implementations, the {@link PartialLoadMatcher.MatchResult} value type, and 
{@link UnknownPartialLoadMatcher}
+ * (the {@code defaultImpl} fallback for unrecognized matcher types). 
Per-matcher behavior tests live in their own
+ * test classes (see {@link ExactProjectionPartialLoadMatcherTest},
+ * {@link WildcardProjectionPartialLoadMatcherTest}).
+ */
+public class PartialLoadMatcherTest
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+
+  private static final Map<String, Object> BASE_LOAD_SPEC = Map.of(
+      "type", "local",
+      "path", "/var/druid/segments/foo"
+  );
+
+  private static final DataSegment.Builder BUILDER = DataSegment
+      .builder(
+          SegmentId.of(
+              "test",
+              Intervals.of("2024-01-01/2024-02-01"),
+              DateTimes.nowUtc().toString(),
+              new NumberedShardSpec(0, 0)
+          )
+      )
+      .loadSpec(BASE_LOAD_SPEC)
+      .size(0);
+
+  @Test
+  void testFingerprintStableAcrossEquivalentInputOrderAndDuplicates()
+  {
+    // Two matcher configurations resolving to the same set on the same 
segment should yield identical fingerprints
+    // (the cascade should not thrash on equivalent rule rewordings).
+    DataSegment segment = segmentWithProjections(List.of("a", "b", "c"));
+    ExactProjectionPartialLoadMatcher orderA = new 
ExactProjectionPartialLoadMatcher(
+        List.of("a", "b")
+    );
+    ExactProjectionPartialLoadMatcher orderB = new 
ExactProjectionPartialLoadMatcher(
+        List.of("b", "b", "a")
+    );
+    String fingerprintA = orderA.match(segment, 
segment.getLoadSpec()).fingerprint();
+    String fingerprintB = orderB.match(segment, 
segment.getLoadSpec()).fingerprint();
+    Assertions.assertEquals(fingerprintA, fingerprintB);
+  }
+
+  @Test
+  void testFingerprintDiffersAcrossDifferentResolvedSets()
+  {
+    DataSegment segment = segmentWithProjections(List.of("a", "b", "c"));
+    String fingerprintAB = new ExactProjectionPartialLoadMatcher(List.of("a", 
"b"))
+        .match(segment, segment.getLoadSpec()).fingerprint();
+    String fingerprintAC = new ExactProjectionPartialLoadMatcher(List.of("a", 
"c"))
+        .match(segment, segment.getLoadSpec()).fingerprint();
+    Assertions.assertNotEquals(fingerprintAB, fingerprintAC);
+  }
+
+  @Test
+  void testFingerprintMatchesAcrossExactAndWildcardWhenResolvedSetIdentical()
+  {
+    // Exact {a,b} and Wildcard {a*} resolving to the same set on this segment 
should produce the same fingerprint,
+    // since fingerprinting is over the resolved list, not the rule text.
+    DataSegment segment = segmentWithProjections(List.of("a", "ab"));
+    String fingerprintExact = new 
ExactProjectionPartialLoadMatcher(List.of("a", "ab"))
+        .match(segment, segment.getLoadSpec()).fingerprint();
+    String fingerprintGlob = new 
WildcardProjectionPartialLoadMatcher(List.of("a*"), null)
+        .match(segment, segment.getLoadSpec()).fingerprint();
+    Assertions.assertEquals(fingerprintExact, fingerprintGlob);
+  }
+
+  @Test
+  void testMatchResultEquals()
+  {
+    
EqualsVerifier.forClass(PartialLoadMatcher.MatchResult.class).usingGetClass().verify();
+  }
+
+  @Test
+  void testUnknownTypeDeserializesToFallback() throws Exception
+  {
+    // A matcher type unknown to this Druid version should fall back to 
UnknownPartialLoadMatcher rather than fail to
+    // parse, so the rules cascade can keep working when an older coordinator 
reads a rule authored on a newer version.
+    String json = "{\"type\": \"someFutureMatcherType\", \"someFutureField\": 
[\"x\", \"y\"]}";
+    PartialLoadMatcher matcher = OBJECT_MAPPER.readValue(json, 
PartialLoadMatcher.class);
+    Assertions.assertInstanceOf(UnknownPartialLoadMatcher.class, matcher);
+  }
+
+  @Test
+  void testUnknownMatcherDoesNotApply()
+  {
+    // An unknown matcher returns null from match() so the rule's appliesTo 
defers to the rule's onCannotMatch
+    // behavior (FALL_THROUGH continues the cascade; FULL_LOAD applies the 
rule as a full load).
+    UnknownPartialLoadMatcher matcher = new UnknownPartialLoadMatcher();
+    DataSegment segment = segmentWithProjections(List.of("a", "b"));
+    Assertions.assertNull(matcher.match(segment, segment.getLoadSpec()));
+  }
+
+  @Test
+  void testUnknownMatcherEquals()
+  {
+    Assertions.assertEquals(new UnknownPartialLoadMatcher(), new 
UnknownPartialLoadMatcher());
+    Assertions.assertEquals(
+        new UnknownPartialLoadMatcher().hashCode(),
+        new UnknownPartialLoadMatcher().hashCode()
+    );
+  }
+
+  private static DataSegment segmentWithProjections(List<String> projections)
+  {
+    return BUILDER.projections(projections).build();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
new file mode 100644
index 00000000000..f7cf0f2bc07
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/PartialLoadRuleTest.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.client.DruidServer;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.hamcrest.MatcherAssert;
+import org.joda.time.DateTime;
+import org.joda.time.Interval;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+public class PartialLoadRuleTest
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+  private static final DateTime NOW = DateTimes.of("2024-06-01T00:00:00");
+  private static final Interval IN_WINDOW = 
Intervals.of("2024-05-15/2024-05-20");
+  private static final Interval OUT_OF_WINDOW = 
Intervals.of("2020-01-01/2020-02-01");
+
+  @Test
+  void testAppliesToOutsideWindowReturnsFalseRegardlessOfMatcher()
+  {
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(1),
+        null,
+        exact("a"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(OUT_OF_WINDOW, List.of("a", 
"b"));
+    Assertions.assertFalse(rule.appliesTo(segment, NOW));
+  }
+
+  @Test
+  void testAppliesToWindowMatchAndMatcherProducesResultReturnsTrue()
+  {
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(1),
+        null,
+        exact("a"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
+    Assertions.assertTrue(rule.appliesTo(segment, NOW));
+  }
+
+  @Test
+  void testAppliesToMatcherDoesNotApplyFallThroughReturnsFalse()
+  {
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(1),
+        null,
+        exact("nonexistent"),
+        CannotMatchBehavior.FALL_THROUGH
+    );
+    DataSegment segment = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
+    Assertions.assertFalse(rule.appliesTo(segment, NOW));
+  }
+
+  @Test
+  void testAppliesToMatcherDoesNotApplyFullLoadIsDefault()
+  {
+    // FULL_LOAD is the default. Segment has projections but matcher resolves 
to nothing,
+    // so the rule still applies and the segment gets full-loaded on this tier.
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(1),
+        null,
+        exact("nonexistent"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
+    Assertions.assertTrue(rule.appliesTo(segment, NOW));
+  }
+
+  @Test
+  void testAppliesToProjectionAgnosticSegmentFallThrough()
+  {
+    // Pre-Druid-32 segment: projections == null. With FALL_THROUGH, rule does 
not apply.
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(1),
+        null,
+        exact("a"),
+        CannotMatchBehavior.FALL_THROUGH
+    );
+    DataSegment segment = segmentWithProjections(IN_WINDOW, null);
+    Assertions.assertFalse(rule.appliesTo(segment, NOW));
+  }
+
+  @Test
+  void testAppliesToProjectionAgnosticSegmentFullLoad()
+  {
+    // Default behavior: pre-Druid-32 segments fall into full-load on this 
tier.
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(1),
+        null,
+        exact("a"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(IN_WINDOW, null);
+    Assertions.assertTrue(rule.appliesTo(segment, NOW));
+  }
+
+  @Test
+  void testIntervalOverloadIgnoresMatcher()
+  {
+    // The Interval-based appliesTo overload has no segment context, so it can 
only check the time window. Matcher
+    // logic does not run.
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(1),
+        null,
+        exact("nonexistent"),
+        null
+    );
+    Assertions.assertTrue(rule.appliesTo(IN_WINDOW, NOW));
+    Assertions.assertFalse(rule.appliesTo(OUT_OF_WINDOW, NOW));
+  }
+
+  @Test
+  void testCascadeFallThroughToFullLoad()
+  {
+    // Rule 1: partial { "a" } 30 days, explicit FALL_THROUGH when matcher 
cannot match
+    PartialLoadRule partial = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(1),
+        null,
+        exact("a"),
+        CannotMatchBehavior.FALL_THROUGH
+    );
+    // Rule 2: forever full load (no projections required)
+    ForeverLoadRule full = new ForeverLoadRule(tier(1), null);
+
+    // Segment with no projections. Partial rule falls through; cascade lands 
on full.
+    DataSegment legacy = segmentWithProjections(IN_WINDOW, null);
+    Assertions.assertFalse(partial.appliesTo(legacy, NOW));
+    Assertions.assertTrue(full.appliesTo(legacy, NOW));
+
+    // Segment with matching projection. Partial rule applies, cascade stops 
there.
+    DataSegment modern = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
+    Assertions.assertTrue(partial.appliesTo(modern, NOW));
+  }
+
+  @Test
+  void testConstructorRejectsNullMatcher()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new PeriodPartialLoadRule(new Period("P1D"), false, tier(1), 
null, null, null)
+        ),
+        DruidExceptionMatcher.invalidInput()
+                             .expectMessageContains("matcher must not be null")
+    );
+  }
+
+  @Test
+  void testPeriodSerde() throws Exception
+  {
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(2),
+        true,
+        exact("a", "b"),
+        CannotMatchBehavior.FULL_LOAD
+    );
+    Rule reread = 
OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsString(rule), Rule.class);
+    Assertions.assertEquals(rule, reread);
+    Assertions.assertInstanceOf(PeriodPartialLoadRule.class, reread);
+  }
+
+  @Test
+  void testPeriodSerdeDefaults() throws Exception
+  {
+    String json = """
+        {\
+        "type": "loadPartialByPeriod",\
+        "period": "P30D",\
+        "matcher": {"type": "exactProjection", "names": ["a"]}\
+        }""";
+    PeriodPartialLoadRule rule = (PeriodPartialLoadRule) 
OBJECT_MAPPER.readValue(json, Rule.class);
+    Assertions.assertEquals(CannotMatchBehavior.FULL_LOAD, 
rule.getOnCannotMatch());
+    Assertions.assertEquals(PeriodLoadRule.DEFAULT_INCLUDE_FUTURE, 
rule.isIncludeFuture());
+    Assertions.assertEquals(
+        Map.of(DruidServer.DEFAULT_TIER, DruidServer.DEFAULT_NUM_REPLICANTS),
+        rule.getTieredReplicants()
+    );
+  }
+
+  @Test
+  void testUnknownOnCannotMatchValueDeserializesToFullLoadDefault() throws 
Exception
+  {
+    // Simulates an older coordinator reading a rule authored by a newer 
version that introduced
+    // a new CannotMatchBehavior value. The rule should parse, with the 
unknown value falling
+    // back to the constructor's default (FULL_LOAD) rather than failing 
deserialization.
+    String json = """
+        {\
+        "type": "loadPartialByPeriod",\
+        "period": "P30D",\
+        "matcher": {"type": "exactProjection", "names": ["a"]},\
+        "onCannotMatch": "SOME_FUTURE_BEHAVIOR"\
+        }""";
+    PeriodPartialLoadRule rule = (PeriodPartialLoadRule) 
OBJECT_MAPPER.readValue(json, Rule.class);
+    Assertions.assertEquals(CannotMatchBehavior.FULL_LOAD, 
rule.getOnCannotMatch());
+  }
+
+  @Test
+  void testIntervalSerde() throws Exception
+  {
+    IntervalPartialLoadRule rule = new IntervalPartialLoadRule(
+        Intervals.of("2024-01-01/2024-02-01"),
+        tier(1),
+        null,
+        new WildcardProjectionPartialLoadMatcher(List.of("user_*"), null),
+        CannotMatchBehavior.FALL_THROUGH
+    );
+    Rule reread = 
OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsString(rule), Rule.class);
+    Assertions.assertEquals(rule, reread);
+    Assertions.assertInstanceOf(IntervalPartialLoadRule.class, reread);
+  }
+
+  @Test
+  void testForeverSerde() throws Exception
+  {
+    ForeverPartialLoadRule rule = new ForeverPartialLoadRule(
+        tier(1),
+        null,
+        exact("a"),
+        CannotMatchBehavior.FULL_LOAD
+    );
+    Rule reread = 
OBJECT_MAPPER.readValue(OBJECT_MAPPER.writeValueAsString(rule), Rule.class);
+    Assertions.assertEquals(rule, reread);
+    Assertions.assertInstanceOf(ForeverPartialLoadRule.class, reread);
+  }
+
+  @Test
+  void testForeverAlwaysAppliesToTimeWindow()
+  {
+    ForeverPartialLoadRule rule = new ForeverPartialLoadRule(
+        tier(1),
+        null,
+        exact("a"),
+        CannotMatchBehavior.FULL_LOAD
+    );
+    DataSegment legacy = segmentWithProjections(OUT_OF_WINDOW, null);
+    Assertions.assertTrue(rule.appliesTo(legacy, NOW));
+  }
+
+  @Test
+  void testRunForwardsToReplicateSegment()
+  {
+    PeriodPartialLoadRule rule = new PeriodPartialLoadRule(
+        new Period("P30D"),
+        false,
+        tier(2),
+        null,
+        exact("a"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(IN_WINDOW, List.of("a", "b"));
+    RecordingHandler handler = new RecordingHandler();
+    rule.run(segment, handler);
+    Assertions.assertEquals(1, handler.replicateCalls);
+    Assertions.assertEquals(0, handler.broadcastCalls);
+    Assertions.assertEquals(0, handler.deleteCalls);
+    Assertions.assertEquals(tier(2), handler.lastTierToReplicaCount);
+  }
+
+  @Test
+  void testPeriodEquals()
+  {
+    EqualsVerifier.forClass(PeriodPartialLoadRule.class)
+                  .withNonnullFields("tieredReplicants", "matcher", 
"onCannotMatch")
+                  .withIgnoredFields("shouldSegmentBeLoaded")
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  void testIntervalEquals()
+  {
+    EqualsVerifier.forClass(IntervalPartialLoadRule.class)
+                  .withNonnullFields("tieredReplicants", "matcher", 
"onCannotMatch")
+                  .withIgnoredFields("shouldSegmentBeLoaded")
+                  .usingGetClass()
+                  .verify();
+  }
+
+  @Test
+  void testForeverEquals()
+  {
+    EqualsVerifier.forClass(ForeverPartialLoadRule.class)
+                  .withNonnullFields("tieredReplicants", "matcher", 
"onCannotMatch")
+                  .withIgnoredFields("shouldSegmentBeLoaded")
+                  .usingGetClass()
+                  .verify();
+  }
+
+  private static DataSegment segmentWithProjections(Interval interval, 
List<String> projections)
+  {
+    return DataSegment
+        .builder(SegmentId.of("test", interval, DateTimes.nowUtc().toString(), 
new NumberedShardSpec(0, 0)))
+        .loadSpec(Map.of("type", "local", "path", "/var/druid/segments/foo"))
+        .projections(projections)
+        .size(0)
+        .build();
+  }
+
+  private static PartialLoadMatcher exact(String... names)
+  {
+    return new ExactProjectionPartialLoadMatcher(Arrays.asList(names));
+  }
+
+  private static Map<String, Integer> tier(int n)
+  {
+    return Map.of(DruidServer.DEFAULT_TIER, n);
+  }
+
+  private static class RecordingHandler implements SegmentActionHandler
+  {
+    int replicateCalls;
+    int broadcastCalls;
+    int deleteCalls;
+    Map<String, Integer> lastTierToReplicaCount;
+
+    @Override
+    public void replicateSegment(DataSegment segment, Map<String, Integer> 
tierToReplicaCount)
+    {
+      replicateCalls++;
+      lastTierToReplicaCount = tierToReplicaCount;
+    }
+
+    @Override
+    public void deleteSegment(DataSegment segment)
+    {
+      deleteCalls++;
+    }
+
+    @Override
+    public void broadcastSegment(DataSegment segment)
+    {
+      broadcastCalls++;
+    }
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcherTest.java
 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcherTest.java
new file mode 100644
index 00000000000..0f1a12e85e8
--- /dev/null
+++ 
b/server/src/test/java/org/apache/druid/server/coordinator/rules/WildcardProjectionPartialLoadMatcherTest.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.server.coordinator.rules;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import nl.jqno.equalsverifier.EqualsVerifier;
+import org.apache.druid.error.DruidException;
+import org.apache.druid.error.DruidExceptionMatcher;
+import org.apache.druid.jackson.DefaultObjectMapper;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
+import org.hamcrest.MatcherAssert;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class WildcardProjectionPartialLoadMatcherTest
+{
+  private static final ObjectMapper OBJECT_MAPPER = new DefaultObjectMapper();
+
+  private static final Map<String, Object> BASE_LOAD_SPEC = Map.of(
+      "type", "local",
+      "path", "/var/druid/segments/foo"
+  );
+
+  private static final DataSegment.Builder BUILDER = DataSegment
+      .builder(
+          SegmentId.of(
+              "test",
+              Intervals.of("2024-01-01/2024-02-01"),
+              DateTimes.nowUtc().toString(),
+              new NumberedShardSpec(0, 0)
+          )
+      )
+      .loadSpec(BASE_LOAD_SPEC)
+      .size(0);
+
+  @Test
+  void testConstructorRejectsNullPatterns()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new WildcardProjectionPartialLoadMatcher(null, null)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageContains("patterns 
must not be null or empty")
+    );
+  }
+
+  @Test
+  void testConstructorRejectsEmptyPatterns()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new 
WildcardProjectionPartialLoadMatcher(Collections.emptyList(), null)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageContains("patterns 
must not be null or empty")
+    );
+  }
+
+  @Test
+  void testMatchStarMatchesAll()
+  {
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("*"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("alpha", "beta", 
"gamma"));
+
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("alpha", "beta", "gamma"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testMatchPrefixGlob()
+  {
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("user_*"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(
+        List.of("user_daily", "user_hourly", "session_daily")
+    );
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("user_daily", "user_hourly"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testMatchSingleCharGlob()
+  {
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("p?"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("p1", "p2", "pxx", 
"q1"));
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("p1", "p2"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testMatchSuffixGlob()
+  {
+    // Wildcard at the start of the pattern: anchors enforced by full-string 
match.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("*_daily"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("user_daily", 
"session_daily", "user_hourly"));
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("session_daily", "user_daily"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testMatchMidStringGlob()
+  {
+    // Wildcard between two literal fragments: full-string match required on 
both ends.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("user_*_v2"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(
+        List.of("user_daily_v2", "user_hourly_v2", "user_daily", 
"session_daily_v2")
+    );
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("user_daily_v2", "user_hourly_v2"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testMatchIsCaseSensitive()
+  {
+    // matching is case-sensitive
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("User_*"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("user_daily", 
"User_daily"));
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of("User_daily"), 
result.wrappedLoadSpec().get("projections"));
+  }
+
+  @Test
+  void testMatchReturnsNullWhenNoPatternsHit()
+  {
+    // Segment has projections but none match any configured pattern: distinct 
code path from the
+    // projection-agnostic-segment short-circuit.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("user_*"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("session_daily", 
"session_hourly", "other"));
+    Assertions.assertNull(matcher.match(segment, segment.getLoadSpec()));
+  }
+
+  @Test
+  void testMultiplePatternsUnioned()
+  {
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("user_*", "session_d*"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(
+        List.of("user_daily", "session_daily", "session_hourly", "other")
+    );
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("session_daily", "user_daily"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testReturnsNullForProjectionAgnosticSegment()
+  {
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("*"),
+        null
+    );
+    Assertions.assertNull(matcher.match(segmentWithProjections(null), 
BASE_LOAD_SPEC));
+    
Assertions.assertNull(matcher.match(segmentWithProjections(Collections.emptyList()),
 BASE_LOAD_SPEC));
+  }
+
+  @Test
+  void testEscapesRegexMetachars()
+  {
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("a.b"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("a.b", "axb"));
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of("a.b"), 
result.wrappedLoadSpec().get("projections"));
+  }
+
+  @Test
+  void testEscapesAsterisk()
+  {
+    // \\* in the pattern matches a literal asterisk, not the glob wildcard.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("foo\\*bar"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("foo*bar", 
"fooXYZbar", "foo*bar*baz"));
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of("foo*bar"), 
result.wrappedLoadSpec().get("projections"));
+  }
+
+  @Test
+  void testEscapesQuestionMark()
+  {
+    // \\? in the pattern matches a literal question mark, not the glob 
single-char wildcard.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("foo\\?bar"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("foo?bar", 
"fooXbar"));
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of("foo?bar"), 
result.wrappedLoadSpec().get("projections"));
+  }
+
+  @Test
+  void testEscapesBackslash()
+  {
+    // \\\\ in the Java string is \\ in the pattern, which represents a single 
literal backslash.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("foo\\\\bar"),
+        null
+    );
+    DataSegment segment = segmentWithProjections(List.of("foo\\bar", 
"foobar"));
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(List.of("foo\\bar"), 
result.wrappedLoadSpec().get("projections"));
+  }
+
+  @Test
+  void testRejectsTrailingBackslash()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new WildcardProjectionPartialLoadMatcher(List.of("foo\\"), 
null)
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageContains("ends with 
an unescaped backslash")
+    );
+  }
+
+  @Test
+  void testRejectsTrailingBackslashInExcludePatterns()
+  {
+    MatcherAssert.assertThat(
+        Assertions.assertThrows(
+            DruidException.class,
+            () -> new WildcardProjectionPartialLoadMatcher(List.of("*"), 
List.of("foo\\"))
+        ),
+        DruidExceptionMatcher.invalidInput().expectMessageContains("ends with 
an unescaped backslash")
+    );
+  }
+
+  @Test
+  void testSerde() throws Exception
+  {
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("user_*", "p?"),
+        null
+    );
+    String json = OBJECT_MAPPER.writeValueAsString(matcher);
+    PartialLoadMatcher reread = OBJECT_MAPPER.readValue(json, 
PartialLoadMatcher.class);
+    Assertions.assertEquals(matcher, reread);
+    Assertions.assertInstanceOf(WildcardProjectionPartialLoadMatcher.class, 
reread);
+  }
+
+  @Test
+  void testExcludeLiteralRemovesMatchedName()
+  {
+    // Long-retention rule that loads all user_* projections except 
user_daily, the latter is intended to live only on
+    // a shorter-retention exact-match rule. A literal name is a zero-wildcard 
glob, so the same excludePatterns field
+    // covers this case.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("user_*"),
+        List.of("user_daily")
+    );
+    DataSegment segment = segmentWithProjections(
+        List.of("user_daily", "user_hourly", "user_weekly")
+    );
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("user_hourly", "user_weekly"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testExcludePatternRemovesMatchingNames()
+  {
+    // Broad rule that loads everything except names handled by a different, 
more specific rule.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("*"),
+        List.of("user_*")
+    );
+    DataSegment segment = segmentWithProjections(
+        List.of("user_daily", "user_hourly", "session_daily", "other")
+    );
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("other", "session_daily"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testMultipleExcludePatternsUnioned()
+  {
+    // Multiple excludePatterns are OR'd: a name matching any one of them is 
excluded.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("*"),
+        List.of("user_*", "session_temp_*")
+    );
+    DataSegment segment = segmentWithProjections(
+        List.of("user_daily", "session_temp_x", "session_daily", "other")
+    );
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("other", "session_daily"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testExcludeNotMatchedIsNoop()
+  {
+    // Excluding a pattern that doesn't match anything in the segment is a 
no-op.
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("user_*"),
+        List.of("session_*")
+    );
+    DataSegment segment = segmentWithProjections(
+        List.of("user_daily", "user_hourly", "session_daily")
+    );
+    PartialLoadMatcher.MatchResult result = matcher.match(segment, 
segment.getLoadSpec());
+    Assertions.assertNotNull(result);
+    Assertions.assertEquals(
+        List.of("user_daily", "user_hourly"),
+        result.wrappedLoadSpec().get("projections")
+    );
+  }
+
+  @Test
+  void testExcludeAllMatchedReturnsNull()
+  {
+    // If excludePatterns consume every match the result is empty; the matcher 
reports "does not match".
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("user_*"),
+        List.of("user_*")
+    );
+    DataSegment segment = segmentWithProjections(List.of("user_daily", 
"user_hourly"));
+    Assertions.assertNull(matcher.match(segment, segment.getLoadSpec()));
+  }
+
+  @Test
+  void testExcludeChangesFingerprint()
+  {
+    DataSegment segment = segmentWithProjections(List.of("user_daily", 
"user_hourly"));
+    String withoutExcludes = new WildcardProjectionPartialLoadMatcher(
+        List.of("user_*"),
+        null
+    ).match(segment, segment.getLoadSpec()).fingerprint();
+    String withExcludes = new WildcardProjectionPartialLoadMatcher(
+        List.of("user_*"),
+        List.of("user_daily")
+    ).match(segment, segment.getLoadSpec()).fingerprint();
+    Assertions.assertNotEquals(withoutExcludes, withExcludes);
+  }
+
+  @Test
+  void testSerdeWithExcludePatterns() throws Exception
+  {
+    WildcardProjectionPartialLoadMatcher matcher = new 
WildcardProjectionPartialLoadMatcher(
+        List.of("*"),
+        List.of("user_*")
+    );
+    String json = OBJECT_MAPPER.writeValueAsString(matcher);
+    PartialLoadMatcher reread = OBJECT_MAPPER.readValue(json, 
PartialLoadMatcher.class);
+    Assertions.assertEquals(matcher, reread);
+    Assertions.assertEquals(
+        List.of("user_*"),
+        ((WildcardProjectionPartialLoadMatcher) reread).getExcludePatterns()
+    );
+  }
+
+  @Test
+  void testEquals()
+  {
+    EqualsVerifier.forClass(WildcardProjectionPartialLoadMatcher.class)
+                  .withIgnoredFields("compiledPatterns", 
"compiledExcludePatterns")
+                  .usingGetClass()
+                  .verify();
+  }
+
+  private static DataSegment segmentWithProjections(List<String> projections)
+  {
+    return BUILDER.projections(projections).build();
+  }
+}
diff --git 
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
 
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
index 183d785e979..8b117834d36 100644
--- 
a/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
+++ 
b/server/src/test/java/org/apache/druid/server/http/DataSourcesResourceTest.java
@@ -54,8 +54,11 @@ import org.apache.druid.segment.TestDataSource;
 import org.apache.druid.server.coordination.DruidServerMetadata;
 import org.apache.druid.server.coordination.ServerType;
 import org.apache.druid.server.coordinator.DruidCoordinator;
+import org.apache.druid.server.coordinator.rules.CannotMatchBehavior;
+import 
org.apache.druid.server.coordinator.rules.ExactProjectionPartialLoadMatcher;
 import org.apache.druid.server.coordinator.rules.IntervalDropRule;
 import org.apache.druid.server.coordinator.rules.IntervalLoadRule;
+import org.apache.druid.server.coordinator.rules.IntervalPartialLoadRule;
 import org.apache.druid.server.coordinator.rules.Rule;
 import org.apache.druid.server.security.Access;
 import org.apache.druid.server.security.Action;
@@ -651,11 +654,21 @@ public class DataSourcesResourceTest
   @Test
   public void testIsHandOffComplete()
   {
+    // Cascade with only interval-based rules (IntervalLoadRule + 
IntervalDropRule). The lazy supplier never fires
+    // since no segment-aware rule shows up, so no metadata snapshot lookups 
should happen.
     MetadataRuleManager databaseRuleManager = 
EasyMock.createMock(MetadataRuleManager.class);
     Rule loadRule = new 
IntervalLoadRule(Intervals.of("2013-01-02T00:00:00Z/2013-01-03T00:00:00Z"), 
null, null);
     Rule dropRule = new 
IntervalDropRule(Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z"));
     DataSourcesResource dataSourcesResource =
-        new DataSourcesResource(inventoryView, null, databaseRuleManager, 
null, null, null, auditManager);
+        new DataSourcesResource(
+            inventoryView,
+            segmentsMetadataManager,
+            databaseRuleManager,
+            null,
+            null,
+            null,
+            auditManager
+        );
 
     // test dropped
     
EasyMock.expect(databaseRuleManager.getRulesWithDefault(TestDataSource.WIKI))
@@ -717,6 +730,221 @@ public class DataSourcesResourceTest
     EasyMock.verify(inventoryView, databaseRuleManager);
   }
 
+  @Test
+  public void testIsHandOffCompleteSegmentNotInMetadataReturnsTrue()
+  {
+    // Cascade contains a partial rule (segment-aware), so the lazy supplier 
fires when the cascade reaches it.
+    // The segment isn't published in metadata and isn't found after a forced 
refresh, so the response is true.
+    MetadataRuleManager databaseRuleManager = 
EasyMock.createMock(MetadataRuleManager.class);
+    Interval ruleInterval = 
Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z");
+    Rule partialRule = new IntervalPartialLoadRule(
+        ruleInterval,
+        null,
+        null,
+        new ExactProjectionPartialLoadMatcher(ImmutableList.of("user_daily")),
+        CannotMatchBehavior.FALL_THROUGH
+    );
+    DataSourcesResource dataSourcesResource =
+        new DataSourcesResource(
+            inventoryView,
+            segmentsMetadataManager,
+            databaseRuleManager,
+            null,
+            null,
+            null,
+            auditManager
+        );
+    
EasyMock.expect(databaseRuleManager.getRulesWithDefault(TestDataSource.WIKI))
+            .andReturn(ImmutableList.of(partialRule))
+            .once();
+    EasyMock.expect(segmentsMetadataManager.getRecentDataSourcesSnapshot())
+            
.andReturn(DataSourcesSnapshot.fromUsedSegments(ImmutableList.of()))
+            .once();
+    EasyMock.expect(segmentsMetadataManager.forceUpdateDataSourcesSnapshot())
+            
.andReturn(DataSourcesSnapshot.fromUsedSegments(ImmutableList.of()))
+            .once();
+    EasyMock.replay(databaseRuleManager, segmentsMetadataManager);
+
+    String interval = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
+    Response response = 
dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1");
+    Assert.assertTrue((boolean) response.getEntity());
+
+    EasyMock.verify(databaseRuleManager, segmentsMetadataManager);
+  }
+
+  @Test
+  public void testIsHandOffCompleteForcesMetadataRefreshOnSnapshotMiss()
+  {
+    // Cascade contains a partial rule, so the segment supplier fires. The 
cached snapshot misses; the forced refresh
+    // finds the segment, and the rule cascade evaluates against it.
+    MetadataRuleManager databaseRuleManager = 
EasyMock.createMock(MetadataRuleManager.class);
+    Interval ruleInterval = 
Intervals.of("2013-01-01T00:00:00Z/2013-01-02T00:00:00Z");
+    Rule partialRule = new IntervalPartialLoadRule(
+        ruleInterval,
+        null,
+        null,
+        new ExactProjectionPartialLoadMatcher(ImmutableList.of("user_daily")),
+        CannotMatchBehavior.FULL_LOAD
+    );
+    DataSourcesResource dataSourcesResource =
+        new DataSourcesResource(
+            inventoryView,
+            segmentsMetadataManager,
+            databaseRuleManager,
+            null,
+            null,
+            null,
+            auditManager
+        );
+    String interval = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
+    DataSegment segment = buildHandoffSegment(TestDataSource.WIKI, 
Intervals.of(interval), "v1", 1);
+
+    
EasyMock.expect(databaseRuleManager.getRulesWithDefault(TestDataSource.WIKI))
+            .andReturn(ImmutableList.of(partialRule))
+            .once();
+    EasyMock.expect(segmentsMetadataManager.getRecentDataSourcesSnapshot())
+            
.andReturn(DataSourcesSnapshot.fromUsedSegments(ImmutableList.of()))
+            .once();
+    EasyMock.expect(segmentsMetadataManager.forceUpdateDataSourcesSnapshot())
+            
.andReturn(DataSourcesSnapshot.fromUsedSegments(ImmutableList.of(segment)))
+            .once();
+    EasyMock.expect(inventoryView.getTimeline(new 
TableDataSource(TestDataSource.WIKI)))
+            .andReturn(null)
+            .once();
+    EasyMock.replay(inventoryView, databaseRuleManager, 
segmentsMetadataManager);
+
+    Response response = 
dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1");
+    Assert.assertFalse((boolean) response.getEntity());
+
+    EasyMock.verify(inventoryView, databaseRuleManager, 
segmentsMetadataManager);
+  }
+
+  @Test
+  public void testIsHandOffCompleteWithPartialLoadRuleFallThrough()
+  {
+    // A FALL_THROUGH partial rule whose matcher does not resolve on the 
segment (the projection it asks for is not
+    // present) should not halt the cascade. The next rule (drop) catches the 
segment, so the response is true.
+    MetadataRuleManager databaseRuleManager = 
EasyMock.createMock(MetadataRuleManager.class);
+    Interval ruleInterval = 
Intervals.of("2013-01-01T00:00:00Z/2013-01-03T00:00:00Z");
+    Rule partialRule = new IntervalPartialLoadRule(
+        ruleInterval,
+        null,
+        null,
+        new ExactProjectionPartialLoadMatcher(ImmutableList.of("user_daily")),
+        CannotMatchBehavior.FALL_THROUGH
+    );
+    Rule dropRule = new IntervalDropRule(ruleInterval);
+    DataSourcesResource dataSourcesResource =
+        new DataSourcesResource(
+            inventoryView,
+            segmentsMetadataManager,
+            databaseRuleManager,
+            null,
+            null,
+            null,
+            auditManager
+        );
+
+    String interval = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
+    // Segment exposes projections [other_daily] which the partial rule's 
matcher (asking for "user_daily") cannot
+    // resolve, so the partial rule falls through and the drop rule catches it.
+    DataSegment segment = buildHandoffSegment(
+        TestDataSource.WIKI,
+        Intervals.of(interval),
+        "v1",
+        1,
+        ImmutableList.of("other_daily")
+    );
+
+    
EasyMock.expect(databaseRuleManager.getRulesWithDefault(TestDataSource.WIKI))
+            .andReturn(ImmutableList.of(partialRule, dropRule))
+            .once();
+    EasyMock.expect(segmentsMetadataManager.getRecentDataSourcesSnapshot())
+            
.andReturn(DataSourcesSnapshot.fromUsedSegments(ImmutableList.of(segment)))
+            .once();
+    EasyMock.replay(databaseRuleManager, segmentsMetadataManager);
+
+    Response response = 
dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1");
+    Assert.assertTrue((boolean) response.getEntity());
+
+    EasyMock.verify(databaseRuleManager, segmentsMetadataManager);
+  }
+
+  @Test
+  public void testIsHandOffCompleteWithPartialLoadRuleMatcherResolves()
+  {
+    // A partial rule whose matcher does resolve on the segment applies 
(loads), so the segment is "still waiting for
+    // handoff", the response is false until the timeline reflects it.
+    MetadataRuleManager databaseRuleManager = 
EasyMock.createMock(MetadataRuleManager.class);
+    Interval ruleInterval = 
Intervals.of("2013-01-01T00:00:00Z/2013-01-03T00:00:00Z");
+    Rule partialRule = new IntervalPartialLoadRule(
+        ruleInterval,
+        null,
+        null,
+        new ExactProjectionPartialLoadMatcher(ImmutableList.of("user_daily")),
+        CannotMatchBehavior.FALL_THROUGH
+    );
+    Rule dropRule = new IntervalDropRule(ruleInterval);
+    DataSourcesResource dataSourcesResource =
+        new DataSourcesResource(
+            inventoryView,
+            segmentsMetadataManager,
+            databaseRuleManager,
+            null,
+            null,
+            null,
+            auditManager
+        );
+
+    String interval = "2013-01-01T01:00:00Z/2013-01-01T02:00:00Z";
+    DataSegment segment = buildHandoffSegment(
+        TestDataSource.WIKI,
+        Intervals.of(interval),
+        "v1",
+        1,
+        ImmutableList.of("user_daily")
+    );
+
+    
EasyMock.expect(databaseRuleManager.getRulesWithDefault(TestDataSource.WIKI))
+            .andReturn(ImmutableList.of(partialRule, dropRule))
+            .once();
+    EasyMock.expect(segmentsMetadataManager.getRecentDataSourcesSnapshot())
+            
.andReturn(DataSourcesSnapshot.fromUsedSegments(ImmutableList.of(segment)))
+            .once();
+    EasyMock.expect(inventoryView.getTimeline(new 
TableDataSource(TestDataSource.WIKI)))
+            .andReturn(null)
+            .once();
+    EasyMock.replay(inventoryView, databaseRuleManager, 
segmentsMetadataManager);
+
+    Response response = 
dataSourcesResource.isHandOffComplete(TestDataSource.WIKI, interval, 1, "v1");
+    Assert.assertFalse((boolean) response.getEntity());
+
+    EasyMock.verify(inventoryView, databaseRuleManager, 
segmentsMetadataManager);
+  }
+
+  private static DataSegment buildHandoffSegment(String dataSource, Interval 
interval, String version, int partitionNumber)
+  {
+    return buildHandoffSegment(dataSource, interval, version, partitionNumber, 
null);
+  }
+
+  private static DataSegment buildHandoffSegment(
+      String dataSource,
+      Interval interval,
+      String version,
+      int partitionNumber,
+      List<String> projections
+  )
+  {
+    return DataSegment.builder()
+                      .dataSource(dataSource)
+                      .interval(interval)
+                      .version(version)
+                      .shardSpec(new NumberedShardSpec(partitionNumber, 100))
+                      .projections(projections)
+                      .size(0)
+                      .build();
+  }
+
   @Test
   public void testMarkSegmentAsUsed()
   {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to