This is an automated email from the ASF dual-hosted git repository.
abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new a289901680c fix: preserve DimensionValueSetShardSpec on upgraded
append segments (#19615)
a289901680c is described below
commit a289901680c021b28560916ee1fd4ff5394a5c79
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Mon Jun 22 15:44:09 2026 -0700
fix: preserve DimensionValueSetShardSpec on upgraded append segments
(#19615)
When a concurrent REPLACE upgrades a still-appending streaming task, the
upgraded (new-version) copy of each append segment previously adopted the
pending segment's plain NumberedShardSpec, dropping the
DimensionValueSetShardSpec stamped at publish time. This made upgraded
segments unprunable by the broker.
The upgraded copy now takes its partition number and core-partition count
from the pending segment while carrying forward the original segment's
partitionDimensionValues when it is a DimensionValueSetShardSpec. The
value-set guarantee holds because the upgraded copy serves the same rows
as the original append segment.
---
.../IndexerSQLMetadataStorageCoordinator.java | 23 ++-
.../IndexerSQLMetadataStorageCoordinatorTest.java | 184 ++++++++++++++++++++-
2 files changed, 205 insertions(+), 2 deletions(-)
diff --git
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
index fbe30a31e88..3aa06b8e371 100644
---
a/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
+++
b/server/src/main/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinator.java
@@ -65,6 +65,7 @@ import org.apache.druid.timeline.Partitions;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.TimelineObjectHolder;
+import org.apache.druid.timeline.partition.DimensionValueSetShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.PartialShardSpec;
import org.apache.druid.timeline.partition.PartitionChunk;
@@ -1223,10 +1224,11 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
final SegmentId newVersionSegmentId =
pendingSegment.getId().asSegmentId();
newVersionSegmentToParent.put(newVersionSegmentId,
oldSegment.getId());
upgradedFromSegmentIdMap.put(newVersionSegmentId.toString(),
oldSegment.getId().toString());
+ final ShardSpec upgradedShardSpec =
getUpgradedSegmentShardSpec(oldSegment, pendingSegment);
allSegmentsToInsert.add(DataSegment.builder(oldSegment)
.interval(newVersionSegmentId.getInterval())
.version(newVersionSegmentId.getVersion())
-
.shardSpec(pendingSegment.getId().getShardSpec())
+ .shardSpec(upgradedShardSpec)
.build());
}
}
@@ -1287,6 +1289,25 @@ public class IndexerSQLMetadataStorageCoordinator
implements IndexerMetadataStor
}
}
+ /**
+ * Builds the shard spec for the upgraded copy of an append segment. The
partition number and core-partition count come
+ * from the pending segment, but a {@link DimensionValueSetShardSpec} on the
original is preserved so the upgraded copy
+ * stays prunable by the broker.
+ */
+ private static ShardSpec getUpgradedSegmentShardSpec(DataSegment oldSegment,
PendingSegmentRecord pendingSegment)
+ {
+ final ShardSpec pendingShardSpec = pendingSegment.getId().getShardSpec();
+ final ShardSpec oldShardSpec = oldSegment.getShardSpec();
+ if (oldShardSpec instanceof DimensionValueSetShardSpec) {
+ return new DimensionValueSetShardSpec(
+ pendingShardSpec.getPartitionNum(),
+ pendingShardSpec.getNumCorePartitions(),
+ ((DimensionValueSetShardSpec)
oldShardSpec).getPartitionDimensionValues()
+ );
+ }
+ return pendingShardSpec;
+ }
+
private Map<SegmentCreateRequest, PendingSegmentRecord> createNewSegments(
SegmentMetadataTransaction transaction,
String dataSource,
diff --git
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
index 238b76f741a..f9ab6078a9a 100644
---
a/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
+++
b/server/src/test/java/org/apache/druid/metadata/IndexerSQLMetadataStorageCoordinatorTest.java
@@ -66,6 +66,7 @@ import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.SegmentId;
import org.apache.druid.timeline.SegmentTimeline;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.apache.druid.timeline.partition.DimensionValueSetShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedPartialShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.LinearShardSpec;
@@ -144,7 +145,12 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
{
derbyConnector = derbyConnectorRule.getConnector();
segmentsTable = derbyConnectorRule.segments();
- mapper.registerSubtypes(LinearShardSpec.class, NumberedShardSpec.class,
HashBasedNumberedShardSpec.class);
+ mapper.registerSubtypes(
+ LinearShardSpec.class,
+ NumberedShardSpec.class,
+ HashBasedNumberedShardSpec.class,
+ DimensionValueSetShardSpec.class
+ );
derbyConnector.createDataSourceTable();
derbyConnector.createTaskTables();
derbyConnector.createSegmentTable();
@@ -438,6 +444,106 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
Assert.assertEquals(replaceLock.getVersion(),
Iterables.getOnlyElement(observedLockVersions));
}
+ /**
+ * When a concurrent REPLACE upgrades a still-appending task, the upgraded
copy must take its partition number and
+ * core-partition count from the (numbered) pending segment while preserving
the original append segment's
+ * {@link DimensionValueSetShardSpec}, so it stays prunable by the broker.
+ */
+ @Test
+ public void
testCommitAppendSegments_upgradedSegmentPreservesDimensionValueSetShardSpec()
+ {
+ final String appendVersion = "2023-01-01";
+ final String upgradedVersion = "2023-02-01";
+
+ final String taskAllocatorId = "appendTask";
+ final String replaceTaskId = "replaceTask1";
+ final ReplaceTaskLock replaceLock = new ReplaceTaskLock(
+ replaceTaskId,
+ Intervals.of("2023-01-01/2023-02-01"),
+ upgradedVersion
+ );
+
+ final Map<String, List<String>> partitionDimensionValues =
ImmutableMap.of("tenant_id", ImmutableList.of("tenant_a"));
+ // The published append segment carries a DimensionValueSetShardSpec, as
stamped at publish time by the streaming task.
+ final DataSegment appendSegment = createSegment(
+ Intervals.of("2023-01-01/2023-01-02"),
+ appendVersion,
+ new DimensionValueSetShardSpec(0, 1, partitionDimensionValues)
+ );
+
+ final List<PendingSegmentRecord> pendingSegmentsForTask = new
ArrayList<>();
+ // The pending segment for the append segment itself.
+ pendingSegmentsForTask.add(
+ PendingSegmentRecord.create(
+ SegmentIdWithShardSpec.fromDataSegment(appendSegment),
+ appendVersion,
+ appendSegment.getId().toString(),
+ null,
+ taskAllocatorId
+ )
+ );
+ // The upgraded pending segment minted by the concurrent REPLACE —
numbered, pointing back to the append segment.
+ final SegmentIdWithShardSpec upgradedPendingId = new
SegmentIdWithShardSpec(
+ TestDataSource.WIKI,
+ Intervals.of("2023-01-01/2023-02-01"),
+ upgradedVersion,
+ new NumberedShardSpec(5, 8)
+ );
+ pendingSegmentsForTask.add(
+ PendingSegmentRecord.create(
+ upgradedPendingId,
+ upgradedVersion,
+ appendSegment.getId().toString(),
+ appendSegment.getId().toString(),
+ taskAllocatorId
+ )
+ );
+ insertPendingSegments(TestDataSource.WIKI, pendingSegmentsForTask, false);
+
+ final SegmentPublishResult commitResult = coordinator.commitAppendSegments(
+ Set.of(appendSegment),
+ Map.of(appendSegment, replaceLock),
+ taskAllocatorId,
+ null
+ );
+ Assert.assertTrue(commitResult.isSuccess());
+
+ final Set<DataSegment> allCommittedSegments
+ = new
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+ final Map<String, String> upgradedFromSegmentIdMap =
coordinator.retrieveUpgradedFromSegmentIds(
+ TestDataSource.WIKI,
+
allCommittedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+ );
+
+ // The original append segment is published as-is, retaining its
DimensionValueSetShardSpec.
+ Assert.assertTrue(allCommittedSegments.contains(appendSegment));
+ Assert.assertTrue(appendSegment.getShardSpec() instanceof
DimensionValueSetShardSpec);
+
+ // Find the upgraded copy (the one whose upgradedFromSegmentId points back
to the append segment).
+ DataSegment upgradedSegment = null;
+ for (DataSegment segment : allCommittedSegments) {
+ if
(appendSegment.getId().toString().equals(upgradedFromSegmentIdMap.get(segment.getId().toString())))
{
+ upgradedSegment = segment;
+ }
+ }
+ Assert.assertNotNull("Expected an upgraded copy of the append segment",
upgradedSegment);
+
+ // The upgraded copy is published under the replace version, with the
pending segment's partition number and core
+ // partitions, but it preserves the original DimensionValueSetShardSpec
(and partitionDimensionValues).
+ Assert.assertEquals(upgradedVersion, upgradedSegment.getVersion());
+ Assert.assertTrue(
+ "upgraded append segment should preserve DimensionValueSetShardSpec",
+ upgradedSegment.getShardSpec() instanceof DimensionValueSetShardSpec
+ );
+ Assert.assertEquals(
+ partitionDimensionValues,
+ ((DimensionValueSetShardSpec)
upgradedSegment.getShardSpec()).getPartitionDimensionValues()
+ );
+ // Partition number and core partitions come from the (numbered) pending
segment.
+ Assert.assertEquals(5, upgradedSegment.getShardSpec().getPartitionNum());
+ Assert.assertEquals(8,
upgradedSegment.getShardSpec().getNumCorePartitions());
+ }
+
@Test
public void
testCommitReplaceSegments_partiallyOverlappingPendingSegmentUnsupported()
{
@@ -632,6 +738,82 @@ public class IndexerSQLMetadataStorageCoordinatorTest
extends IndexerSqlMetadata
);
}
+ /**
+ * When a REPLACE commits over an interval with already-published APPEND
segments held under a REPLACE lock, the
+ * upgraded (re-versioned) copies must preserve their {@link
DimensionValueSetShardSpec} so they remain prunable by
+ * the broker.
+ */
+ @Test
+ public void
testCommitReplaceSegments_upgradedPublishedSegmentPreservesDimensionValueSetShardSpec()
+ {
+ final ReplaceTaskLock replaceLock = new ReplaceTaskLock("g1",
Intervals.of("2023-01-01/2023-02-01"), "2023-02-01");
+
+ final Map<String, List<String>> partitionDimensionValues =
ImmutableMap.of("tenant_id", ImmutableList.of("tenant_a"));
+ // A published APPEND segment carrying a DimensionValueSetShardSpec (as
stamped at publish time by the streaming task).
+ final DataSegment appendSegment = new DataSegment(
+ "foo",
+ Intervals.of("2023-01-01/2023-01-02"),
+ "2023-01-01",
+ ImmutableMap.of("path", "a-0"),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new DimensionValueSetShardSpec(0, 1, partitionDimensionValues),
+ 9,
+ 100
+ );
+ segmentSchemaTestUtils.insertUsedSegments(Set.of(appendSegment),
Collections.emptyMap());
+ insertIntoUpgradeSegmentsTable(
+ Map.of(appendSegment, replaceLock),
+ derbyConnectorRule.metadataTablesConfigSupplier().get()
+ );
+
+ final Set<DataSegment> replacingSegments = new HashSet<>();
+ for (int i = 0; i < 4; i++) {
+ replacingSegments.add(
+ new DataSegment(
+ "foo",
+ Intervals.of("2023-01-01/2023-02-01"),
+ "2023-02-01",
+ ImmutableMap.of("path", "b-" + i),
+ ImmutableList.of("dim1"),
+ ImmutableList.of("m1"),
+ new NumberedShardSpec(i, 4),
+ 9,
+ 100
+ )
+ );
+ }
+ Assert.assertTrue(coordinator.commitReplaceSegments(replacingSegments,
Set.of(replaceLock), null).isSuccess());
+
+ final Set<DataSegment> usedSegments
+ = new
HashSet<>(retrieveUsedSegments(derbyConnectorRule.metadataTablesConfigSupplier().get()));
+ final Map<String, String> upgradedFromSegmentIdMap =
coordinator.retrieveUpgradedFromSegmentIds(
+ "foo",
+
usedSegments.stream().map(DataSegment::getId).map(SegmentId::toString).collect(Collectors.toSet())
+ );
+
+ // Find the upgraded copy of the append segment (the one whose
upgradedFromSegmentId points back to it).
+ DataSegment upgradedSegment = null;
+ for (DataSegment segment : usedSegments) {
+ if
(appendSegment.getId().toString().equals(upgradedFromSegmentIdMap.get(segment.getId().toString())))
{
+ upgradedSegment = segment;
+ }
+ }
+ Assert.assertNotNull("Expected an upgraded published segment",
upgradedSegment);
+
+ // The upgraded published segment is re-versioned to the replace version
but keeps its DimensionValueSetShardSpec
+ // (and partitionDimensionValues), so it remains prunable by the broker.
+ Assert.assertEquals("2023-02-01", upgradedSegment.getVersion());
+ Assert.assertTrue(
+ "upgraded published segment should preserve
DimensionValueSetShardSpec",
+ upgradedSegment.getShardSpec() instanceof DimensionValueSetShardSpec
+ );
+ Assert.assertEquals(
+ partitionDimensionValues,
+ ((DimensionValueSetShardSpec)
upgradedSegment.getShardSpec()).getPartitionDimensionValues()
+ );
+ }
+
@Test
public void testCommitReplaceSegmentsWithUpdatedCorePartitions()
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]