github-advanced-security[bot] commented on code in PR #19535:
URL: https://github.com/apache/druid/pull/19535#discussion_r3333814034
##########
server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java:
##########
@@ -236,15 +279,8 @@
// clear the future so the next caller gets a fresh attempt
mountFuture.set(null);
ours.setException(t);
- if (t instanceof IOException) {
- throw (IOException) t;
- }
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- }
- if (t instanceof Error) {
- throw (Error) t;
- }
+ Throwables.propagateIfInstanceOf(t, IOException.class);
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [Throwables.propagateIfInstanceOf](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11273)
##########
server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java:
##########
@@ -236,15 +279,8 @@
// clear the future so the next caller gets a fresh attempt
mountFuture.set(null);
ours.setException(t);
- if (t instanceof IOException) {
- throw (IOException) t;
- }
- if (t instanceof RuntimeException) {
- throw (RuntimeException) t;
- }
- if (t instanceof Error) {
- throw (Error) t;
- }
+ Throwables.propagateIfInstanceOf(t, IOException.class);
+ Throwables.propagateIfPossible(t);
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [Throwables.propagateIfPossible](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11274)
##########
server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java:
##########
@@ -280,87 +316,129 @@
// blocked on it. adjustReservation also runs outside entryLock:
StorageLocation.release goes
// writeLock -> entryLock (via release -> unmount), so entryLock ->
writeLock here would be a deadlock-prone
// lock-order inversion.
- entryLock.lock();
try {
- if (location != null && fileMapper != null) {
- if (!location.equals(mountLocation)) {
- throw DruidException.defensive(
- "Already mounted[%s] in location[%s] which differs from
requested[%s]",
- id,
- location.getPath(),
- mountLocation.getPath()
- );
+ entryLock.lock();
+ try {
+ if (location != null && fileMapper != null) {
+ if (!location.equals(mountLocation)) {
+ throw DruidException.defensive(
+ "Already mounted[%s] in location[%s] which differs from
requested[%s]",
+ id,
+ location.getPath(),
+ mountLocation.getPath()
+ );
+ }
+ return;
}
- return;
}
- }
- finally {
- entryLock.unlock();
- }
+ finally {
+ entryLock.unlock();
+ }
- final PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
- rangeReader,
- jsonMapper,
- localCacheDir,
- targetFilename,
- externalFilenames
- );
+ final PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ jsonMapper,
+ localCacheDir,
+ targetFilename,
+ externalFilenames
+ );
- final long sizeToAdjust;
- try {
- final long actualSize = mapper.getOnDiskHeaderSize();
- if (actualSize > reservationEstimate) {
- throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(
- "Partial segment metadata for [%s] is [%d]
bytes on disk, exceeding the "
- + "configured reservation estimate of [%d]
bytes. Increase "
- +
"druid.segmentCache.virtualStorageMetadataReservationEstimate.",
- segmentId,
- actualSize,
- reservationEstimate
- );
+ final long sizeToAdjust;
+ try {
+ final long actualSize = mapper.getOnDiskHeaderSize();
+ if (actualSize > reservationEstimate) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(
+ "Partial segment metadata for [%s] is [%d]
bytes on disk, exceeding the "
+ + "configured reservation estimate of [%d]
bytes. Increase "
+ +
"druid.segmentCache.virtualStorageMetadataReservationEstimate.",
+ segmentId,
+ actualSize,
+ reservationEstimate
+ );
+ }
+ sizeToAdjust = actualSize < reservationEstimate ? actualSize : -1;
+
+ entryLock.lock();
+ try {
+ location = mountLocation;
+ fileMapper = mapper;
+ // Install (or re-install, after a previous mount/unmount cycle
terminated the prior Phaser) the
+ // reference-counted gate over cleanup. Future
acquireMetadataReference() / unmount() calls operate on this
+ // instance.
+ references.set(new
ReferenceCountingCloseableObject<Closeable>(this::doActualUnmount) {});
+ }
+ finally {
+ entryLock.unlock();
+ }
+ }
+ catch (Throwable t) {
+ // mount failed; close mmaps and delete the on-disk header files so a
retry starts clean. Mirrors the eager
+ // SegmentCacheEntry behavior: simpler to redo a small header
range-read than to reason about whatever partial
+ // on-disk state the failure left. Crash-mid-mount across JVM restarts
is still handled by the mapper's own
+ // corruption recovery when bootstrap runs at next startup; this path
covers the in-process retry case.
+ try {
+ mapper.close();
+ }
+ catch (Throwable closeError) {
+ t.addSuppressed(closeError);
+ }
+ try {
+ deleteHeaderFiles();
+ }
+ catch (Throwable deleteError) {
+ t.addSuppressed(deleteError);
+ }
+ throw t;
}
- sizeToAdjust = actualSize < reservationEstimate ? actualSize : -1;
- entryLock.lock();
+ // Only shrink the reservation if the entry is still registered with the
location. If we lost the reservation
+ // mid-mount (concurrent canceler / drop), adjustReservation would
throw; defer to the post-mount check in
+ // mount() to roll back cleanly instead.
+ if (sizeToAdjust >= 0 && (mountLocation.isReserved(id) ||
mountLocation.isWeakReserved(id))) {
+ mountLocation.adjustReservation(id, sizeToAdjust);
+ }
+
+ // Restore any bundles whose container files survived on disk for this
segment. No-op on the fresh-acquire path
+ // (the partialDir was just created and contains no container files
yet). The bundle restore needs metadata's
+ // file mapper, which is now installed, so this runs after the commit
above. On any failure, fire our own
+ // deferred-cleanup gate via unmount() to undo the file-mapper install +
delete header files; synchronous since
+ // we just installed the gate and no external caller has had a chance to
acquire a reference yet. The location
+ // reservation release stays the caller's responsibility (matches
mount's overall contract).
try {
- location = mountLocation;
- fileMapper = mapper;
- // Install (or re-install, after a previous mount/unmount cycle
terminated the prior Phaser) the
- // reference-counted gate over cleanup. Future acquireReference() /
unmount() calls operate on this instance.
- references.set(new
ReferenceCountingCloseableObject<Closeable>(this::doActualUnmount) {});
+ PartialSegmentCacheBootstrap.restoreBundlesFromDisk(this,
mountLocation);
}
- finally {
- entryLock.unlock();
+ catch (Throwable t) {
+ try {
+ unmount();
+ }
+ catch (Throwable cleanupError) {
+ t.addSuppressed(cleanupError);
+ }
+ Throwables.propagateIfInstanceOf(t, IOException.class);
+ Throwables.propagateIfPossible(t);
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [Throwables.propagateIfPossible](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11276)
##########
server/src/main/java/org/apache/druid/segment/loading/PartialSegmentMetadataCacheEntry.java:
##########
@@ -280,87 +316,129 @@
// blocked on it. adjustReservation also runs outside entryLock:
StorageLocation.release goes
// writeLock -> entryLock (via release -> unmount), so entryLock ->
writeLock here would be a deadlock-prone
// lock-order inversion.
- entryLock.lock();
try {
- if (location != null && fileMapper != null) {
- if (!location.equals(mountLocation)) {
- throw DruidException.defensive(
- "Already mounted[%s] in location[%s] which differs from
requested[%s]",
- id,
- location.getPath(),
- mountLocation.getPath()
- );
+ entryLock.lock();
+ try {
+ if (location != null && fileMapper != null) {
+ if (!location.equals(mountLocation)) {
+ throw DruidException.defensive(
+ "Already mounted[%s] in location[%s] which differs from
requested[%s]",
+ id,
+ location.getPath(),
+ mountLocation.getPath()
+ );
+ }
+ return;
}
- return;
}
- }
- finally {
- entryLock.unlock();
- }
+ finally {
+ entryLock.unlock();
+ }
- final PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
- rangeReader,
- jsonMapper,
- localCacheDir,
- targetFilename,
- externalFilenames
- );
+ final PartialSegmentFileMapperV10 mapper =
PartialSegmentFileMapperV10.create(
+ rangeReader,
+ jsonMapper,
+ localCacheDir,
+ targetFilename,
+ externalFilenames
+ );
- final long sizeToAdjust;
- try {
- final long actualSize = mapper.getOnDiskHeaderSize();
- if (actualSize > reservationEstimate) {
- throw DruidException.forPersona(DruidException.Persona.OPERATOR)
-
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
- .build(
- "Partial segment metadata for [%s] is [%d]
bytes on disk, exceeding the "
- + "configured reservation estimate of [%d]
bytes. Increase "
- +
"druid.segmentCache.virtualStorageMetadataReservationEstimate.",
- segmentId,
- actualSize,
- reservationEstimate
- );
+ final long sizeToAdjust;
+ try {
+ final long actualSize = mapper.getOnDiskHeaderSize();
+ if (actualSize > reservationEstimate) {
+ throw DruidException.forPersona(DruidException.Persona.OPERATOR)
+
.ofCategory(DruidException.Category.RUNTIME_FAILURE)
+ .build(
+ "Partial segment metadata for [%s] is [%d]
bytes on disk, exceeding the "
+ + "configured reservation estimate of [%d]
bytes. Increase "
+ +
"druid.segmentCache.virtualStorageMetadataReservationEstimate.",
+ segmentId,
+ actualSize,
+ reservationEstimate
+ );
+ }
+ sizeToAdjust = actualSize < reservationEstimate ? actualSize : -1;
+
+ entryLock.lock();
+ try {
+ location = mountLocation;
+ fileMapper = mapper;
+ // Install (or re-install, after a previous mount/unmount cycle
terminated the prior Phaser) the
+ // reference-counted gate over cleanup. Future
acquireMetadataReference() / unmount() calls operate on this
+ // instance.
+ references.set(new
ReferenceCountingCloseableObject<Closeable>(this::doActualUnmount) {});
+ }
+ finally {
+ entryLock.unlock();
+ }
+ }
+ catch (Throwable t) {
+ // mount failed; close mmaps and delete the on-disk header files so a
retry starts clean. Mirrors the eager
+ // SegmentCacheEntry behavior: simpler to redo a small header
range-read than to reason about whatever partial
+ // on-disk state the failure left. Crash-mid-mount across JVM restarts
is still handled by the mapper's own
+ // corruption recovery when bootstrap runs at next startup; this path
covers the in-process retry case.
+ try {
+ mapper.close();
+ }
+ catch (Throwable closeError) {
+ t.addSuppressed(closeError);
+ }
+ try {
+ deleteHeaderFiles();
+ }
+ catch (Throwable deleteError) {
+ t.addSuppressed(deleteError);
+ }
+ throw t;
}
- sizeToAdjust = actualSize < reservationEstimate ? actualSize : -1;
- entryLock.lock();
+ // Only shrink the reservation if the entry is still registered with the
location. If we lost the reservation
+ // mid-mount (concurrent canceler / drop), adjustReservation would
throw; defer to the post-mount check in
+ // mount() to roll back cleanly instead.
+ if (sizeToAdjust >= 0 && (mountLocation.isReserved(id) ||
mountLocation.isWeakReserved(id))) {
+ mountLocation.adjustReservation(id, sizeToAdjust);
+ }
+
+ // Restore any bundles whose container files survived on disk for this
segment. No-op on the fresh-acquire path
+ // (the partialDir was just created and contains no container files
yet). The bundle restore needs metadata's
+ // file mapper, which is now installed, so this runs after the commit
above. On any failure, fire our own
+ // deferred-cleanup gate via unmount() to undo the file-mapper install +
delete header files; synchronous since
+ // we just installed the gate and no external caller has had a chance to
acquire a reference yet. The location
+ // reservation release stays the caller's responsibility (matches
mount's overall contract).
try {
- location = mountLocation;
- fileMapper = mapper;
- // Install (or re-install, after a previous mount/unmount cycle
terminated the prior Phaser) the
- // reference-counted gate over cleanup. Future acquireReference() /
unmount() calls operate on this instance.
- references.set(new
ReferenceCountingCloseableObject<Closeable>(this::doActualUnmount) {});
+ PartialSegmentCacheBootstrap.restoreBundlesFromDisk(this,
mountLocation);
}
- finally {
- entryLock.unlock();
+ catch (Throwable t) {
+ try {
+ unmount();
+ }
+ catch (Throwable cleanupError) {
+ t.addSuppressed(cleanupError);
+ }
+ Throwables.propagateIfInstanceOf(t, IOException.class);
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [Throwables.propagateIfInstanceOf](1) should be avoided because it
has been deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11275)
##########
server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialAcquireTest.java:
##########
@@ -0,0 +1,527 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.databind.InjectableValues;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.guice.LocalDataStorageDruidModule;
+import org.apache.druid.jackson.SegmentizerModule;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.java.util.emitter.EmittingLogger;
+import org.apache.druid.math.expr.ExprMacroTable;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.query.expression.TestExprMacroTable;
+import org.apache.druid.segment.CursorBuildSpec;
+import org.apache.druid.segment.CursorFactory;
+import org.apache.druid.segment.CursorHolder;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.PartialQueryableIndexSegment;
+import org.apache.druid.segment.Segment;
+import org.apache.druid.segment.SegmentLazyLoadFailCallback;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.TimeBoundaryInspector;
+import org.apache.druid.segment.V10TimeBoundaryInspector;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.projections.Projections;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.server.metrics.NoopServiceEmitter;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.apache.druid.utils.CloseableUtils;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ThreadLocalRandom;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+class SegmentLocalCacheManagerPartialAcquireTest
+{
+ private static final SegmentId SEGMENT_ID = SegmentId.of("test",
Intervals.of("2025/2026"), "v1", 0);
+ private static final DateTime TIME = DateTimes.of("2025-01-01");
+ private static final String AGG_BUNDLE = "dim1_metric1_sum";
+
+ private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+ .add("dim1",
ColumnType.STRING)
+
.add("metric1", ColumnType.LONG)
+ .build();
+
+ private static final List<AggregateProjectionSpec> PROJECTIONS =
Collections.singletonList(
+ AggregateProjectionSpec.builder(AGG_BUNDLE)
+ .groupingColumns(new
StringDimensionSchema("dim1"))
+ .aggregators(
+ new LongSumAggregatorFactory("_metric1_sum",
"metric1"),
+ new CountAggregatorFactory("_count")
+ )
+ .build()
+ );
+
+ private static final List<InputRow> ROWS = Arrays.asList(
+ new ListBasedInputRow(ROW_SIGNATURE, TIME,
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 1L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(1),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 2L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(2),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 3L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 4L))
+ );
+
+ @TempDir
+ static File SHARED_TEMP_DIR;
+
+ private static File DEEP_STORAGE_DIR;
+
+ @TempDir
+ File perTestTempDir;
+
+ private ObjectMapper jsonMapper;
+ private File cacheRoot;
+ private SegmentLocalCacheManager manager;
+ private DataSegment partialSegment;
+
+ @BeforeAll
+ static void buildSegment()
+ {
+ final File tmp = new File(SHARED_TEMP_DIR, "build_" +
ThreadLocalRandom.current().nextInt());
+ DEEP_STORAGE_DIR = IndexBuilder.create()
+ .useV10()
+ .tmpDir(tmp)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ IncrementalIndexSchema.builder()
+
.withDimensionsSpec(
+
DimensionsSpec.builder()
+
.setDimensions(
+
List.of(
+
new StringDimensionSchema("dim1"),
+
new LongDimensionSchema("metric1")
+
)
+
)
+
.build()
+ )
+ .withRollup(false)
+
.withMinTimestamp(TIME.getMillis())
+
.withProjections(PROJECTIONS)
+ .build()
+ )
+ .indexSpec(IndexSpec.builder()
+
.withMetadataCompression(CompressionStrategy.NONE)
+ .build())
+ .rows(ROWS)
+ .buildMMappedIndexFile();
+ EmittingLogger.registerEmitter(new NoopServiceEmitter());
+ }
+
+ @BeforeEach
+ void setup() throws IOException
+ {
+ jsonMapper = TestHelper.makeJsonMapper();
+ jsonMapper.registerSubtypes(new NamedType(LocalLoadSpec.class, "local"));
+ jsonMapper.registerModule(new SegmentizerModule());
+ jsonMapper.registerModules(new
LocalDataStorageDruidModule().getJacksonModules());
+ jsonMapper.setInjectableValues(
+ new InjectableValues.Std()
+ .addValue(LocalDataSegmentPuller.class, new
LocalDataSegmentPuller())
+ .addValue(IndexIO.class, TestHelper.getTestIndexIO(jsonMapper,
ColumnConfig.DEFAULT))
+ .addValue(ObjectMapper.class, jsonMapper)
+ .addValue(DataSegment.PruneSpecsHolder.class,
DataSegment.PruneSpecsHolder.DEFAULT)
+ .addValue(ExprMacroTable.class, TestExprMacroTable.INSTANCE)
+ );
+
+ cacheRoot = new File(perTestTempDir, "cache_" +
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
+ FileUtils.mkdirp(cacheRoot);
+
+ final StorageLocationConfig locConfig = new
StorageLocationConfig(cacheRoot, 1024L * 1024L * 1024L, null);
+ final SegmentLoaderConfig loaderConfig = new SegmentLoaderConfig()
+ .setLocations(List.of(locConfig))
+ .setVirtualStorage(true, false);
+ final List<StorageLocation> storageLocations =
loaderConfig.toStorageLocations();
+ manager = new SegmentLocalCacheManager(
+ storageLocations,
+ loaderConfig,
+ new LeastBytesUsedStorageLocationSelectorStrategy(storageLocations),
+ TestHelper.getTestIndexIO(jsonMapper, ColumnConfig.DEFAULT),
+ jsonMapper
+ )
+ {
+ // Tripwire: the partial-aware acquire paths should be self-contained;
they must not fall through to the
+ // eager-extract acquireCachedSegment API (which is the non-partial
branch of acquireCachedSegment-style lookups).
+ @Override
+ public Optional<Segment> acquireCachedSegment(SegmentId segmentId)
+ {
+ Assertions.fail("should not fallback to acquireCachedSegment");
+ return super.acquireCachedSegment(segmentId);
+ }
+ };
+
+ // DataSegment with a LocalLoadSpec pointing at the deep storage directory
(unzipped V10 layout).
+ partialSegment = DataSegment.builder()
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [DataSegment.builder](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11277)
##########
server/src/test/java/org/apache/druid/segment/loading/SegmentLocalCacheManagerPartialDropTest.java:
##########
@@ -0,0 +1,314 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.segment.loading;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.ListBasedInputRow;
+import org.apache.druid.data.input.impl.AggregateProjectionSpec;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.FileUtils;
+import org.apache.druid.java.util.common.Intervals;
+import org.apache.druid.query.aggregation.CountAggregatorFactory;
+import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
+import org.apache.druid.segment.IndexBuilder;
+import org.apache.druid.segment.IndexIO;
+import org.apache.druid.segment.IndexSpec;
+import org.apache.druid.segment.TestHelper;
+import org.apache.druid.segment.column.ColumnConfig;
+import org.apache.druid.segment.column.ColumnType;
+import org.apache.druid.segment.column.RowSignature;
+import org.apache.druid.segment.data.CompressionStrategy;
+import org.apache.druid.segment.file.PartialSegmentFileMapperV10;
+import org.apache.druid.segment.incremental.IncrementalIndexSchema;
+import org.apache.druid.segment.projections.Projections;
+import
org.apache.druid.segment.writeout.OffHeapMemorySegmentWriteOutMediumFactory;
+import org.apache.druid.testing.InitializedNullHandlingTest;
+import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.NoneShardSpec;
+import org.joda.time.DateTime;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.ThreadLocalRandom;
+
+/**
+ * Manager-level tests for {@code drop} / {@code removeInfoFile} on partial
segments. Partial entries (metadata +
+ * bundles) are weak cache entries; their lifecycle is owned by {@link
StorageLocation}'s SIEVE + hold-phaser
+ * machinery. Coordinator {@code drop} is effectively a no-op for the partial
branch — SIEVE drives the actual
+ * eviction once holds drop and capacity is needed. {@code removeInfoFile}
still wires info-file deletion via the
+ * metadata entry's {@link PartialSegmentMetadataCacheEntry#setOnUnmount} hook
so the info entry survives until the
+ * (now SIEVE-driven) cleanup actually fires.
+ * <p>
+ * These tests use {@link StorageLocation#setAreWeakEntriesEphemeral} so
hold-release triggers immediate eviction,
+ * a deterministic substitute for SIEVE's capacity-pressure-driven reclaim.
+ */
+class SegmentLocalCacheManagerPartialDropTest extends
InitializedNullHandlingTest
+{
+ private static final ObjectMapper JSON_MAPPER = TestHelper.makeJsonMapper();
+ private static final SegmentId SEGMENT_ID = SegmentId.of("test",
Intervals.of("2025/2026"), "v1", 0);
+ private static final long ESTIMATE = 16 * 1024 * 1024L;
+ private static final String AGG_BUNDLE = "dim1_metric1_sum";
+
+ private static final DateTime TIME = DateTimes.of("2025-01-01");
+ private static final RowSignature ROW_SIGNATURE = RowSignature.builder()
+ .add("dim1",
ColumnType.STRING)
+
.add("metric1", ColumnType.LONG)
+ .build();
+
+ private static final List<AggregateProjectionSpec> PROJECTIONS =
Collections.singletonList(
+ AggregateProjectionSpec.builder(AGG_BUNDLE)
+ .groupingColumns(new
StringDimensionSchema("dim1"))
+ .aggregators(
+ new LongSumAggregatorFactory("_metric1_sum",
"metric1"),
+ new CountAggregatorFactory("_count")
+ )
+ .build()
+ );
+
+ private static final List<InputRow> ROWS = Arrays.asList(
+ new ListBasedInputRow(ROW_SIGNATURE, TIME,
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 1L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(1),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("a", 2L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(2),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 3L)),
+ new ListBasedInputRow(ROW_SIGNATURE, TIME.plusMinutes(3),
ROW_SIGNATURE.getColumnNames(), Arrays.asList("b", 4L))
+ );
+
+ @TempDir
+ static File sharedTempDir;
+
+ private static File deepStorageDir;
+
+ @TempDir
+ File perTestTempDir;
+
+ private File cacheDir;
+ private File infoDir;
+ private SegmentLocalCacheManager manager;
+ private StorageLocation location;
+ private DataSegment dataSegment;
+
+ @BeforeAll
+ static void buildSegment()
+ {
+ final File tmp = new File(sharedTempDir, "build_" +
ThreadLocalRandom.current().nextInt());
+ deepStorageDir = IndexBuilder.create()
+ .useV10()
+ .tmpDir(tmp)
+
.segmentWriteOutMediumFactory(OffHeapMemorySegmentWriteOutMediumFactory.instance())
+ .schema(
+ IncrementalIndexSchema.builder()
+ .withDimensionsSpec(
+
DimensionsSpec.builder()
+
.setDimensions(
+
List.of(
+
new StringDimensionSchema("dim1"),
+
new LongDimensionSchema("metric1")
+
)
+ )
+
.build()
+ )
+ .withRollup(false)
+
.withMinTimestamp(TIME.getMillis())
+
.withProjections(PROJECTIONS)
+ .build()
+ )
+
.indexSpec(IndexSpec.builder().withMetadataCompression(CompressionStrategy.NONE).build())
+ .rows(ROWS)
+ .buildMMappedIndexFile();
+ }
+
+ @BeforeEach
+ void setup() throws IOException
+ {
+ cacheDir = new File(perTestTempDir, "cache_" +
ThreadLocalRandom.current().nextInt(Integer.MAX_VALUE));
+ FileUtils.mkdirp(cacheDir);
+ // SegmentLocalCacheManager defaults the info dir to
<firstLocation>/info_dir when not configured.
+ infoDir = new File(cacheDir, "info_dir");
+ FileUtils.mkdirp(infoDir);
+
+ final StorageLocationConfig locConfig = new
StorageLocationConfig(cacheDir, ESTIMATE * 16, null);
+ final SegmentLoaderConfig loaderConfig = new
SegmentLoaderConfig().setLocations(List.of(locConfig));
+ final List<StorageLocation> locations = loaderConfig.toStorageLocations();
+ manager = new SegmentLocalCacheManager(
+ locations,
+ loaderConfig,
+ new LeastBytesUsedStorageLocationSelectorStrategy(locations),
+ TestHelper.getTestIndexIO(JSON_MAPPER, ColumnConfig.DEFAULT),
+ JSON_MAPPER
+ );
+ location = manager.getLocations().get(0);
+ // Drive eviction off hold-release so tests don't need to provoke SIEVE
via capacity pressure.
+ location.setAreWeakEntriesEphemeral(true);
+ dataSegment = DataSegment.builder()
Review Comment:
## CodeQL / Deprecated method or constructor invocation
Invoking [DataSegment.builder](1) should be avoided because it has been
deprecated.
[Show more
details](https://github.com/apache/druid/security/code-scanning/11278)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]