This is an automated email from the ASF dual-hosted git repository.
gianm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new 516e3af60fc Different approach to fixing the specificity problem for
applying skip offsets (#19389)
516e3af60fc is described below
commit 516e3af60fc82ef2ee0eb883b422ac2b72f576fd
Author: Lucas Capistrant <[email protected]>
AuthorDate: Wed Apr 29 14:19:26 2026 -0500
Different approach to fixing the specificity problem for applying skip
offsets (#19389)
---
.../compact/CascadingReindexingTemplate.java | 25 ++++++++++++++++------
.../compact/CascadingReindexingTemplateTest.java | 24 ++++++++++++++-------
2 files changed, 34 insertions(+), 15 deletions(-)
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
index 7fcb0fee22a..c50c7ec7402 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/compact/CascadingReindexingTemplate.java
@@ -354,7 +354,8 @@ public class CascadingReindexingTemplate implements
CompactionJobTemplate, DataS
return Collections.emptyList();
}
- for (IntervalPartitioningInfo intervalInfo : searchIntervals) {
+ for (int i = 0; i < searchIntervals.size(); i++) {
+ IntervalPartitioningInfo intervalInfo = searchIntervals.get(i);
Interval reindexingInterval = intervalInfo.getInterval();
if (!reindexingInterval.overlaps(adjustedTimelineInterval)) {
@@ -363,14 +364,24 @@ public class CascadingReindexingTemplate implements
CompactionJobTemplate, DataS
continue;
}
- // Skip intervals that extend past the skip offset boundary (not just
data boundary)
- // This preserves granularity alignment and ensures intervals exist in
synthetic timeline
- // Only apply this when a skip offset is actually configured
+ // Skip offsets, if configured, can result in needing to truncate a
search interval. If the truncation makes the interval invalid, skip it.
if ((skipOffsetFromNow != null || skipOffsetFromLatest != null) &&
intervalEndsAfter(reindexingInterval,
adjustedTimelineInterval.getEnd())) {
- LOG.debug("Search interval[%s] extends past skip offset boundary[%s],
skipping to preserve alignment",
- reindexingInterval, adjustedTimelineInterval.getEnd());
- continue;
+
+ DateTime alignedEnd =
intervalInfo.getGranularity().bucketStart(adjustedTimelineInterval.getEnd());
+ if (!alignedEnd.isAfter(reindexingInterval.getStart())) {
+ LOG.debug("Search interval[%s] is entirely within skip offset,
skipping", reindexingInterval);
+ continue;
+ }
+ reindexingInterval = new Interval(reindexingInterval.getStart(),
alignedEnd);
+ // Replace the entry in searchIntervals so the downstream
synthetic-timeline lookup
+ // in ReindexingConfigBuilder matches the truncated interval.
+ intervalInfo = new IntervalPartitioningInfo(
+ reindexingInterval,
+ intervalInfo.getSourceRule(),
+ intervalInfo.isRuleSynthetic()
+ );
+ searchIntervals.set(i, intervalInfo);
}
InlineSchemaDataSourceCompactionConfig.Builder builder =
createBaseBuilder();
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
index 98e99810805..c3325725f16 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/compact/CascadingReindexingTemplateTest.java
@@ -384,7 +384,7 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
}
@Test
- public void
test_createCompactionJobs_withSkipOffsetFromLatest_skipsIntervalsExtendingPastOffset()
+ public void
test_createCompactionJobs_withSkipOffsetFromLatest_truncatesIntervalsExtendingPastSkipOffset()
{
DateTime referenceTime = DateTimes.of("2024-01-15T00:00:00Z");
SegmentTimeline timeline = createTestTimeline(referenceTime.minusDays(90),
referenceTime.minusDays(10));
@@ -399,9 +399,11 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
template.createCompactionJobs(mockSource, mockParams);
List<Interval> processedIntervals = template.getProcessedIntervals();
- Assertions.assertEquals(1, processedIntervals.size());
+ Assertions.assertEquals(2, processedIntervals.size());
Assertions.assertEquals(DateTimes.MIN,
processedIntervals.get(0).getStart());
Assertions.assertEquals(referenceTime.minusDays(30),
processedIntervals.get(0).getEnd());
+ Assertions.assertEquals(referenceTime.minusDays(30),
processedIntervals.get(1).getStart());
+ Assertions.assertEquals(referenceTime.minusDays(15),
processedIntervals.get(1).getEnd());
EasyMock.verify(mockProvider, mockParams, mockSource);
}
@@ -411,7 +413,7 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
{
DateTime referenceTime = DateTimes.of("2024-01-15T00:00:00Z");
SegmentTimeline timeline = createTestTimeline(referenceTime.minusDays(90),
referenceTime.minusDays(10));
- ReindexingRuleProvider mockProvider =
createMockProvider(List.of(Period.days(7), Period.days(30)));
+ ReindexingRuleProvider mockProvider =
createMockProvider(List.of(Period.days(3), Period.days(7), Period.days(30)));
CompactionJobParams mockParams = createMockParams(referenceTime, timeline);
DruidInputSource mockSource = createMockSource();
@@ -422,9 +424,11 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
template.createCompactionJobs(mockSource, mockParams);
List<Interval> processedIntervals = template.getProcessedIntervals();
- Assertions.assertEquals(1, processedIntervals.size());
+ Assertions.assertEquals(2, processedIntervals.size());
Assertions.assertEquals(DateTimes.MIN,
processedIntervals.get(0).getStart());
Assertions.assertEquals(referenceTime.minusDays(30),
processedIntervals.get(0).getEnd());
+ Assertions.assertEquals(referenceTime.minusDays(30),
processedIntervals.get(1).getStart());
+ Assertions.assertEquals(referenceTime.minusDays(25),
processedIntervals.get(1).getEnd());
EasyMock.verify(mockProvider, mockParams, mockSource);
}
@@ -451,7 +455,7 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
}
@Test
- public void
test_createCompactionJobs_withSkipOffsetFromNow_skipsIntervalsExtendingPastOffset()
+ public void
test_createCompactionJobs_withSkipOffsetFromNow_truncatesIntervalThatExtendsPastSkipOffset()
{
DateTime referenceTime = DateTimes.of("2024-01-15T00:00:00Z");
SegmentTimeline timeline = createTestTimeline(referenceTime.minusDays(90),
referenceTime.minusDays(10));
@@ -466,9 +470,11 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
template.createCompactionJobs(mockSource, mockParams);
List<Interval> processedIntervals = template.getProcessedIntervals();
- Assertions.assertEquals(1, processedIntervals.size());
+ Assertions.assertEquals(2, processedIntervals.size());
Assertions.assertEquals(DateTimes.MIN,
processedIntervals.get(0).getStart());
Assertions.assertEquals(referenceTime.minusDays(30),
processedIntervals.get(0).getEnd());
+ Assertions.assertEquals(referenceTime.minusDays(30),
processedIntervals.get(1).getStart());
+ Assertions.assertEquals(referenceTime.minusDays(20),
processedIntervals.get(1).getEnd());
EasyMock.verify(mockProvider, mockParams, mockSource);
}
@@ -478,7 +484,7 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
{
DateTime referenceTime = DateTimes.of("2024-01-15T00:00:00Z");
SegmentTimeline timeline = createTestTimeline(referenceTime.minusDays(90),
referenceTime.minusDays(10));
- ReindexingRuleProvider mockProvider =
createMockProvider(List.of(Period.days(7), Period.days(30)));
+ ReindexingRuleProvider mockProvider =
createMockProvider(List.of(Period.days(3), Period.days(7), Period.days(30)));
CompactionJobParams mockParams = createMockParams(referenceTime, timeline);
DruidInputSource mockSource = createMockSource();
@@ -489,9 +495,11 @@ public class CascadingReindexingTemplateTest extends
InitializedNullHandlingTest
template.createCompactionJobs(mockSource, mockParams);
List<Interval> processedIntervals = template.getProcessedIntervals();
- Assertions.assertEquals(1, processedIntervals.size());
+ Assertions.assertEquals(2, processedIntervals.size());
Assertions.assertEquals(DateTimes.MIN,
processedIntervals.get(0).getStart());
Assertions.assertEquals(referenceTime.minusDays(30),
processedIntervals.get(0).getEnd());
+ Assertions.assertEquals(referenceTime.minusDays(30),
processedIntervals.get(1).getStart());
+ Assertions.assertEquals(referenceTime.minusDays(20),
processedIntervals.get(1).getEnd());
EasyMock.verify(mockProvider, mockParams, mockSource);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]