This is an automated email from the ASF dual-hosted git repository.
abhishekrb19 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/master by this push:
new ce74072496f feat: Prunable shard specs for streaming published
segments (#19571)
ce74072496f is described below
commit ce74072496ff8179c93008a202d6ebc4b893c148
Author: Abhishek Radhakrishnan <[email protected]>
AuthorDate: Fri Jun 12 13:56:20 2026 -0700
feat: Prunable shard specs for streaming published segments (#19571)
Kafka ingestion can now publish segments that the broker prunes at query
time, without waiting for compaction. Set
tuningConfig.streamingPartitionsSpec.partitionDimensions to a list of
low-to-medium cardinality dimensions; each task records the distinct values it
observes per dimension and stamps them onto a new dim_value_set shard spec.
Queries that filter on a declared dimension then skip segments whose values
can't match. The feature is opt-in, Kafka-only, and disabled by default; [...]
Compatibility: dim_value_set is a new core shard spec type with no
fallback, so it is not forward-compatible. Upgrade all services before enabling
streamingPartitionsSpec. Once dim_value_set segments are published, downgrade
is unsupported
until they are compacted away or streamingPartitionsSpec is removed.
Highlights:
- StreamRangeShardSpec extends NumberedShardSpec; possibleInDomain prunes by
per-value range intersection. Null is declared as a first-class value
(encoded as Range.lessThan("")) so IS NULL queries are never wrongly
pruned,
and is kept distinct from the empty string.
- Opt-in via partitionFilterDimensions on the Kafka supervisor/IOConfig
(null by default; segments otherwise get a plain NumberedShardSpec). Kafka
only for now; backward-compatible config (old specs/constructors
unchanged).
- Per-segment value accumulation at ingest time; each segment is stamped
with
only its own observed values at publish.
- Correctness guards: restart-spanning segments fall back to
NumberedShardSpec
(pre-restart rows are not re-read, so their values can't be fully
observed);
dimensions that observed a null/missing value declare null so IS NULL is
not
pruned.
- BaseAppenderatorDriver reconciles the returned SegmentsAndCommitMetadata
to
the published shard specs so handoff/publish logs report the real spec.
Tests:
- StreamRangeShardSpecTest: possibleInDomain matrix incl. null vs "" and
serde.
- SeekableStreamIndexTaskRunnerTest: annotator unit tests (restart fallback,
null handling).
- EmbeddedStreamRangeShardSpecTest: end-to-end pruning verified via the
query/segment/time scan metric across a predicate matrix (=, !=, IN, NOT
IN,
IS NULL, IS NOT NULL, multi-value, untracked dimension, non-existent
value),
plus a no-partitioning control twin and in-memory/graceful-widening cases.
- StreamAppenderatorDriverTest: returned metadata carries the published
spec.
* Comments
* Move to tuningConfig.
* Stamp with empty partitions spec when segments cannot be recovered.
* Sort partitionDimension values.
* Use SegmentId for strongly typed identifier & tests
* Reanme stream_range to dim_value_set to better capture the intent.
Renames related classes as well
* Document numeric type is not eligible for pruning & additional test
coverage.
* Assert row values too in addition to counts
* Cleanup
---
docs/ingestion/kafka-ingestion.md | 46 +
.../indexing/kafka/KafkaIndexTaskTuningConfig.java | 17 +-
.../supervisor/KafkaSupervisorTuningConfig.java | 11 +-
.../druid/indexing/kafka/KafkaIndexTaskTest.java | 1 +
.../kafka/KafkaIndexTaskTuningConfigTest.java | 87 +-
.../EmbeddedDimensionValueSetShardSpecTest.java | 1058 ++++++++++++++++++++
.../kafka/supervisor/KafkaTuningConfigBuilder.java | 11 +-
.../TestModifiedKafkaIndexTaskTuningConfig.java | 1 +
.../SeekableStreamIndexTaskRunner.java | 138 ++-
.../SeekableStreamIndexTaskTuningConfig.java | 77 +-
.../seekablestream/StreamingPartitionsSpec.java | 89 ++
.../SeekableStreamIndexTaskRunnerTest.java | 258 ++++-
.../partition/DimensionValueSetShardSpec.java | 164 +++
.../apache/druid/timeline/partition/ShardSpec.java | 2 +
.../partition/DimensionValueSetShardSpecTest.java | 313 ++++++
.../appenderator/BaseAppenderatorDriver.java | 15 +-
.../appenderator/SegmentsAndCommitMetadata.java | 28 +
.../appenderator/StreamAppenderatorDriver.java | 12 +-
.../appenderator/SegmentPublisherHelperTest.java | 19 +
.../appenderator/StreamAppenderatorDriverTest.java | 42 +
20 files changed, 2369 insertions(+), 20 deletions(-)
diff --git a/docs/ingestion/kafka-ingestion.md
b/docs/ingestion/kafka-ingestion.md
index 5216a84b4e4..63ceed8597a 100644
--- a/docs/ingestion/kafka-ingestion.md
+++ b/docs/ingestion/kafka-ingestion.md
@@ -263,6 +263,51 @@ The following example shows a supervisor spec with idle
configuration enabled:
```
</details>
+#### Streaming partitions spec
+
+When you set `streamingPartitionsSpec.partitionDimensions` in the tuning
config, the supervisor tracks the distinct values observed for each listed
dimension during ingestion. At segment publish time, each segment is annotated
with only the values it actually ingested by publishing it with a
`dim_value_set` shard spec, which records the observed values per tracked
dimension. The broker then uses these annotations to skip segments at query
time when the query filter doesn't intersect the [...]
+
+This enables segment pruning for streaming-ingested data without waiting for
compaction to produce hash or range-partitioned segments. The
`partitionDimensions` should be kept in sync with the compaction config's
`partitionDimensions` for the same datasource.
+
+**Usage guidelines:**
+
+- Only string-typed dimensions are currently supported.
+- Use only low-to-medium cardinality dimensions (for example, `tenant_id`,
`region`, `environment`). High-cardinality dimensions bloat segment metadata
with no pruning benefit.
+- Most effective when Kafka partitions are keyed by the tracked dimension (for
example, using tenant ID as the message key). Each task naturally sees a subset
of values, and segments get tight filter annotations.
+- Also works with multiple supervisors reading from separate topics into one
datasource.
+- Use a range or hashed compaction `partitionsSpec`, not the dynamic strategy:
dynamic compaction does not partition by dimension, so it cannot preserve
pruning after compaction.
+- After compaction, the streaming pruning annotations are replaced by the
compaction output's partitioning (hash or range), which provides its own
pruning.
+
+The following example configures a supervisor to track the `tenant` dimension:
+
+```json
+{
+ "type": "kafka",
+ "spec": {
+ "dataSchema": {
+ "dataSource": "multi_tenant_events",
+ "timestampSpec": {"column": "timestamp", "format": "iso"},
+ "dimensionsSpec": {"dimensions": ["tenant", "region", "event_type"]},
+ "granularitySpec": {"type": "uniform", "segmentGranularity": "HOUR",
"queryGranularity": "NONE"}
+ },
+ "ioConfig": {
+ "type": "kafka",
+ "topic": "events",
+ "consumerProperties": {"bootstrap.servers": "localhost:9092"},
+ "inputFormat": {"type": "json"},
+ "taskCount": 4,
+ "taskDuration": "PT1H"
+ },
+ "tuningConfig": {
+ "type": "kafka",
+ "streamingPartitionsSpec": {"partitionDimensions": ["tenant"]}
+ }
+ }
+}
+```
+
+With this configuration, a query like `SELECT * FROM multi_tenant_events WHERE
tenant = 'acme'` skips segments that contain no rows for `acme`, reducing the
number of segments scanned.
+
#### Data format
The Kafka indexing service supports
[`inputFormat`](data-formats.md#input-format). For more information, see
[Source input formats](data-formats.md).
@@ -440,6 +485,7 @@ For configuration properties shared across all streaming
ingestion methods, refe
|Property|Type|Description|Required|Default|
|--------|----|-----------|--------|-------|
|`numPersistThreads`|Integer|The number of threads to use to create and
persist incremental segments on the disk. Higher ingestion data throughput
results in a larger number of incremental segments, causing significant CPU
time to be spent on the creation of the incremental segments on the disk. For
datasources with number of columns running into hundreds or thousands, creation
of the incremental segments may take up significant time, in the order of
multiple seconds. In both of these sc [...]
+|`streamingPartitionsSpec`|Object|Configures query-time segment pruning for
streaming-ingested segments. Contains a single property, `partitionDimensions`
(List of String), the dimensions whose observed values each segment records so
the broker can skip segments that can't match a query filter. See [Streaming
partitions spec](#streaming-partitions-spec) for details.|No|null|
## Deployment notes on Kafka partitions and Druid segments
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
index 3fa046b63e6..fa385fcc12b 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfig.java
@@ -22,6 +22,7 @@ package org.apache.druid.indexing.kafka;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
import org.apache.druid.segment.writeout.SegmentWriteOutMediumFactory;
@@ -54,7 +55,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
@Nullable Integer maxSavedParseExceptions,
@Nullable Integer numPersistThreads,
@Nullable Integer maxColumnsToMerge,
- @Nullable Boolean releaseLocksOnHandoff
+ @Nullable Boolean releaseLocksOnHandoff,
+ @Nullable StreamingPartitionsSpec streamingPartitionsSpec
)
{
super(
@@ -80,7 +82,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
- releaseLocksOnHandoff
+ releaseLocksOnHandoff,
+ streamingPartitionsSpec
);
}
@@ -106,7 +109,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
- @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean
releaseLocksOnHandoff
+ @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean
releaseLocksOnHandoff,
+ @JsonProperty("streamingPartitionsSpec") @Nullable
StreamingPartitionsSpec streamingPartitionsSpec
)
{
this(
@@ -131,7 +135,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
- releaseLocksOnHandoff
+ releaseLocksOnHandoff,
+ streamingPartitionsSpec
);
}
@@ -160,7 +165,8 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
getMaxSavedParseExceptions(),
getNumPersistThreads(),
getMaxColumnsToMerge(),
- isReleaseLocksOnHandoff()
+ isReleaseLocksOnHandoff(),
+ getStreamingPartitionsSpec()
);
}
@@ -188,6 +194,7 @@ public class KafkaIndexTaskTuningConfig extends
SeekableStreamIndexTaskTuningCon
", maxSavedParseExceptions=" + getMaxSavedParseExceptions() +
", numPersistThreads=" + getNumPersistThreads() +
", getMaxColumnsToMerge=" + getMaxColumnsToMerge() +
+ ", streamingPartitionsSpec=" + getStreamingPartitionsSpec() +
'}';
}
diff --git
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
index c4a21674d30..f08f652e4bd 100644
---
a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/indexing/kafka/supervisor/KafkaSupervisorTuningConfig.java
@@ -21,6 +21,7 @@ package org.apache.druid.indexing.kafka.supervisor;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.indexing.kafka.KafkaIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec;
import
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisorTuningConfig;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.incremental.AppendableIndexSpec;
@@ -67,6 +68,7 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
null,
null,
null,
+ null,
null
);
}
@@ -97,7 +99,8 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
@JsonProperty("maxSavedParseExceptions") @Nullable Integer
maxSavedParseExceptions,
@JsonProperty("numPersistThreads") @Nullable Integer numPersistThreads,
@JsonProperty("maxColumnsToMerge") @Nullable Integer maxColumnsToMerge,
- @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean
releaseLocksOnHandoff
+ @JsonProperty("releaseLocksOnHandoff") @Nullable Boolean
releaseLocksOnHandoff,
+ @JsonProperty("streamingPartitionsSpec") @Nullable
StreamingPartitionsSpec streamingPartitionsSpec
)
{
super(
@@ -122,7 +125,8 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
- releaseLocksOnHandoff
+ releaseLocksOnHandoff,
+ streamingPartitionsSpec
);
this.workerThreads = workerThreads;
this.chatRetries = (chatRetries != null ? chatRetries :
DEFAULT_CHAT_RETRIES);
@@ -237,7 +241,8 @@ public class KafkaSupervisorTuningConfig extends
KafkaIndexTaskTuningConfig
getMaxSavedParseExceptions(),
getNumPersistThreads(),
getMaxColumnsToMerge(),
- isReleaseLocksOnHandoff()
+ isReleaseLocksOnHandoff(),
+ getStreamingPartitionsSpec()
);
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
index 5be3f60e6ae..289d774f05e 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTest.java
@@ -2901,6 +2901,7 @@ public class KafkaIndexTaskTest extends
SeekableStreamIndexTaskTestBase
maxSavedParseExceptions,
null,
null,
+ null,
null
);
if (!context.containsKey(SeekableStreamSupervisor.CHECKPOINTS_CTX_KEY)) {
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
index 0083e76ea75..4bf668bbc86 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/KafkaIndexTaskTuningConfigTest.java
@@ -24,6 +24,7 @@ import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorTuningConfig;
import org.apache.druid.indexing.kafka.supervisor.KafkaTuningConfigBuilder;
import
org.apache.druid.indexing.kafka.test.TestModifiedKafkaIndexTaskTuningConfig;
+import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.segment.IndexSpec;
import org.apache.druid.segment.data.CompressionStrategy;
@@ -36,6 +37,9 @@ import org.junit.Test;
import java.io.File;
import java.io.IOException;
import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
public class KafkaIndexTaskTuningConfigTest
{
@@ -128,6 +132,86 @@ public class KafkaIndexTaskTuningConfigTest
Assert.assertEquals(-1, config.getMaxColumnsToMerge());
}
+ @Test
+ public void testSerdeWithStreamingPartitionsSpec() throws Exception
+ {
+ final String jsonStr = "{\n"
+ + " \"type\": \"kafka\",\n"
+ + " \"streamingPartitionsSpec\":
{\"partitionDimensions\": [\"tenant\", \"region\"]}\n"
+ + "}";
+
+ final KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig)
mapper.readValue(
+ mapper.writeValueAsString(mapper.readValue(jsonStr,
TuningConfig.class)),
+ TuningConfig.class
+ );
+
+ Assert.assertEquals(
+ new StreamingPartitionsSpec(List.of("tenant", "region")),
+ config.getStreamingPartitionsSpec()
+ );
+ Assert.assertEquals(List.of("tenant", "region"),
config.getStreamingPartitionsSpec().getPartitionDimensions());
+ }
+
+ @Test
+ public void testSerdeWithoutStreamingPartitionsSpecIsNull() throws Exception
+ {
+ final KafkaIndexTaskTuningConfig config = (KafkaIndexTaskTuningConfig)
mapper.readValue(
+ mapper.writeValueAsString(mapper.readValue("{\"type\": \"kafka\"}",
TuningConfig.class)),
+ TuningConfig.class
+ );
+ Assert.assertNull(config.getStreamingPartitionsSpec());
+ }
+
+ @Test
+ public void testSerdeWithEmptyPartitionDimensions() throws Exception
+ {
+ final KafkaIndexTaskTuningConfig config =
roundTripWithStreamingPartitionsSpec("[]");
+ Assert.assertEquals(Collections.emptyList(),
config.getStreamingPartitionsSpec().getPartitionDimensions());
+ }
+
+ @Test
+ public void testSerdeWithNullPartitionDimensionsCoalescesToEmpty() throws
Exception
+ {
+ final KafkaIndexTaskTuningConfig config =
roundTripWithStreamingPartitionsSpec("null");
+ Assert.assertEquals(Collections.emptyList(),
config.getStreamingPartitionsSpec().getPartitionDimensions());
+ }
+
+ @Test
+ public void testSerdeWithEmptyStringPartitionDimension() throws Exception
+ {
+ // An empty-string dimension name is preserved verbatim (it simply never
matches an ingested value).
+ final KafkaIndexTaskTuningConfig config =
roundTripWithStreamingPartitionsSpec("[\"\"]");
+ Assert.assertEquals(List.of(""),
config.getStreamingPartitionsSpec().getPartitionDimensions());
+ }
+
+ @Test
+ public void testSerdeWithNumericLookingPartitionDimension() throws Exception
+ {
+ // Dimension names are plain strings; a numeric-looking name is just a
string.
+ final KafkaIndexTaskTuningConfig config =
roundTripWithStreamingPartitionsSpec("[\"123\"]");
+ Assert.assertEquals(List.of("123"),
config.getStreamingPartitionsSpec().getPartitionDimensions());
+ }
+
+ @Test
+ public void testSerdeWithNullElementInPartitionDimensions() throws Exception
+ {
+ final KafkaIndexTaskTuningConfig config =
roundTripWithStreamingPartitionsSpec("[\"tenant\", null]");
+ Assert.assertEquals(Arrays.asList("tenant", null),
config.getStreamingPartitionsSpec().getPartitionDimensions());
+ }
+
+ private KafkaIndexTaskTuningConfig
roundTripWithStreamingPartitionsSpec(String partitionDimensionsJson)
+ throws IOException
+ {
+ final String jsonStr = "{\n"
+ + " \"type\": \"kafka\",\n"
+ + " \"streamingPartitionsSpec\":
{\"partitionDimensions\": " + partitionDimensionsJson + "}\n"
+ + "}";
+ return (KafkaIndexTaskTuningConfig) mapper.readValue(
+ mapper.writeValueAsString(mapper.readValue(jsonStr,
TuningConfig.class)),
+ TuningConfig.class
+ );
+ }
+
@Test
public void testConvert()
{
@@ -186,7 +270,8 @@ public class KafkaIndexTaskTuningConfigTest
42,
2,
-1,
- false
+ false,
+ null
);
String serialized = mapper.writeValueAsString(base);
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedDimensionValueSetShardSpecTest.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedDimensionValueSetShardSpecTest.java
new file mode 100644
index 00000000000..855eadbae99
--- /dev/null
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/simulate/EmbeddedDimensionValueSetShardSpecTest.java
@@ -0,0 +1,1058 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.kafka.simulate;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.impl.CsvInputFormat;
+import org.apache.druid.data.input.impl.DimensionsSpec;
+import org.apache.druid.data.input.impl.LongDimensionSchema;
+import org.apache.druid.data.input.impl.StringDimensionSchema;
+import org.apache.druid.data.input.impl.TimestampSpec;
+import org.apache.druid.indexer.granularity.UniformGranularitySpec;
+import org.apache.druid.indexing.common.task.Tasks;
+import org.apache.druid.indexing.kafka.KafkaIndexTaskModule;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpec;
+import org.apache.druid.indexing.kafka.supervisor.KafkaSupervisorSpecBuilder;
+import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec;
+import org.apache.druid.java.util.common.DateTimes;
+import org.apache.druid.java.util.common.StringUtils;
+import org.apache.druid.java.util.common.granularity.Granularities;
+import org.apache.druid.query.DruidMetrics;
+import org.apache.druid.testing.embedded.EmbeddedBroker;
+import org.apache.druid.testing.embedded.EmbeddedCoordinator;
+import org.apache.druid.testing.embedded.EmbeddedDruidCluster;
+import org.apache.druid.testing.embedded.EmbeddedHistorical;
+import org.apache.druid.testing.embedded.EmbeddedIndexer;
+import org.apache.druid.testing.embedded.EmbeddedOverlord;
+import org.apache.druid.testing.embedded.junit5.EmbeddedClusterTestBase;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.joda.time.DateTime;
+import org.joda.time.Period;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Embedded integration tests for {@link
org.apache.druid.timeline.partition.DimensionValueSetShardSpec}: end-to-end
+ * ingestion, publish, query correctness, and broker segment pruning feature
when Kafka tasks hand off segments.
+ */
+public class EmbeddedDimensionValueSetShardSpecTest extends
EmbeddedClusterTestBase
+{
+ private static final String COL_TIMESTAMP = "timestamp";
+ private static final String COL_TENANT = "tenant";
+ private static final String COL_VALUE = "value";
+
+ private static final String TENANT_A = "tenant_a";
+ private static final String TENANT_B = "tenant_b";
+
+ private static final int ROWS_PER_TENANT = 10;
+
+ private final EmbeddedBroker broker = new EmbeddedBroker();
+ private final EmbeddedIndexer indexer = new EmbeddedIndexer();
+ private final EmbeddedOverlord overlord = new EmbeddedOverlord();
+ private final EmbeddedHistorical historical = new EmbeddedHistorical();
+ private final EmbeddedCoordinator coordinator = new EmbeddedCoordinator();
+ private KafkaResource kafkaServer;
+
+ @Override
+ public EmbeddedDruidCluster createCluster()
+ {
+ final EmbeddedDruidCluster cluster =
EmbeddedDruidCluster.withEmbeddedDerbyAndZookeeper();
+ indexer.addProperty("druid.segment.handoff.pollDuration", "PT0.1s");
+ // Short emission period so ingest/events/processed fires within the latch
wait timeout
+ cluster.addCommonProperty("druid.monitoring.emissionPeriod", "PT0.5s");
+ // Defense-in-depth: keep the broker cache off so every query re-scans
segments and emits
+ // query/segment/time (CacheConfig already defaults all flavors to false).
+ broker.addProperty("druid.broker.cache.useCache", "false");
+ broker.addProperty("druid.broker.cache.populateCache", "false");
+ broker.addProperty("druid.broker.cache.useResultLevelCache", "false");
+ broker.addProperty("druid.broker.cache.populateResultLevelCache", "false");
+
+ kafkaServer = new KafkaResource();
+
+ cluster.addExtension(KafkaIndexTaskModule.class)
+ .addResource(kafkaServer)
+ .useLatchableEmitter()
+ .addServer(coordinator)
+ .addServer(overlord)
+ .addServer(indexer)
+ .addServer(historical)
+ .addServer(broker);
+
+ return cluster;
+ }
+
+ @Test
+ public void
test_twoSupervisors_withPartitionDimensionValues_segmentsPublishAndQueriesSucceed()
+ {
+ final String topicA = dataSource + "_topic_a";
+ final String topicB = dataSource + "_topic_b";
+ kafkaServer.createTopicWithPartitions(topicA, 1);
+ kafkaServer.createTopicWithPartitions(topicB, 1);
+
+ // Produce tenant-segregated records to each topic
+ kafkaServer.produceRecordsToTopic(generateRecords(topicA, TENANT_A,
ROWS_PER_TENANT, DateTimes.of("2025-01-01")));
+ kafkaServer.produceRecordsToTopic(generateRecords(topicB, TENANT_B,
ROWS_PER_TENANT, DateTimes.of("2025-01-01")));
+
+ // Start supervisor A — observes tenant dimension from topic A
+ final String supervisorIdA = dataSource + "_supe_a";
+ final KafkaSupervisorSpec specA = newSupervisorBuilder()
+ .withId(supervisorIdA)
+ .build(dataSource, topicA);
+ Assertions.assertEquals(supervisorIdA,
cluster.callApi().postSupervisor(specA));
+
+ // Start supervisor B — observes tenant dimension from topic B
+ final String supervisorIdB = dataSource + "_supe_b";
+ final KafkaSupervisorSpec specB = newSupervisorBuilder()
+ .withId(supervisorIdB)
+ .build(dataSource, topicB);
+ Assertions.assertEquals(supervisorIdB,
cluster.callApi().postSupervisor(specB));
+
+ awaitRowsProcessed(ROWS_PER_TENANT * 2);
+
+ // Realtime: all rows from both tenants are queryable.
+ assertRowCounts();
+
+ // Suspend both supervisors → handoff. (Per-task query routing is covered
by DimensionValueSetShardSpecTest;
+ // here both tasks share a host:port so query/node/time can't distinguish
them — counts confirm correctness.)
+ cluster.callApi().postSupervisor(specA.createSuspendedSpec());
+ cluster.callApi().postSupervisor(specB.createSuspendedSpec());
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/handoff/count")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(2)
+ );
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+
+ // Historical: same counts after publish.
+ assertRowCounts();
+
+ // Verify sys.segments: all published segments carry a dim_value_set shard
spec
+ verifyAllSegmentsHaveDimensionValueSetShardSpec(dataSource);
+
+ // Verify the partitionDimensionValues contain the expected tenant values
for each supervisor's segments
+ final List<Map<String, Object>> shardSpecs = getShardSpecs(dataSource);
+ @SuppressWarnings("unchecked")
+ final Set<String> allObservedTenants = shardSpecs.stream()
+ .map(spec -> ((Map<String, List<String>>)
spec.get("partitionDimensionValues")).get(COL_TENANT))
+ .flatMap(List::stream)
+ .collect(Collectors.toSet());
+ Assertions.assertTrue(allObservedTenants.contains(TENANT_A), "Expected
tenant_a in partitionDimensionValues");
+ Assertions.assertTrue(allObservedTenants.contains(TENANT_B), "Expected
tenant_b in partitionDimensionValues");
+ }
+
+ @Test
+ public void test_multiDimensionAndMultiValuePartitionDimensionValues()
+ {
+ final String colRegion = "region";
+ final String colRegionCode = "region_code"; // numeric (Long)
+
+ final String topicA = dataSource + "_topic_a";
+ final String topicB = dataSource + "_topic_b";
+ kafkaServer.createTopicWithPartitions(topicA, 1);
+ kafkaServer.createTopicWithPartitions(topicB, 1);
+
+ // Topic A: two tenants (tenant_a, tenant_x) across two regions (us-west,
eu-west) with codes 1, 2
+ final List<ProducerRecord<byte[], byte[]>> recordsA = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ recordsA.add(record(topicA, "%s,tenant_a,us-west,1,val_%d",
DateTimes.of("2025-01-01").plusHours(i), i));
+ recordsA.add(record(topicA, "%s,tenant_x,eu-west,2,val_%d",
DateTimes.of("2025-01-01").plusHours(i), i + 100));
+ }
+ // Topic B: one tenant (tenant_b), one region (ap-south) with code 3
+ final List<ProducerRecord<byte[], byte[]>> recordsB = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ recordsB.add(record(topicB, "%s,tenant_b,ap-south,3,val_%d",
DateTimes.of("2025-01-01").plusHours(i), i + 200));
+ }
+ kafkaServer.produceRecordsToTopic(recordsA);
+ kafkaServer.produceRecordsToTopic(recordsB);
+
+ final KafkaSupervisorSpecBuilder multiDimBuilder = new
KafkaSupervisorSpecBuilder()
+ .withContext(Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS,
true))
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null))
+ .withDimensions(
+ DimensionsSpec.builder()
+ .setDimensions(List.of(
+ new StringDimensionSchema(COL_TENANT),
+ new StringDimensionSchema(colRegion),
+ new LongDimensionSchema(colRegionCode),
+ new StringDimensionSchema(COL_VALUE)
+ ))
+ .build()
+ )
+ )
+ .withTuningConfig(
+ t -> t.withMaxRowsPerSegment(1)
+ .withReleaseLocksOnHandoff(true)
+ // Track both a string dimension and a numeric dimension
+ .withStreamingPartitionsSpec(new
StreamingPartitionsSpec(List.of(COL_TENANT, colRegionCode)))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(new CsvInputFormat(
+ List.of(COL_TIMESTAMP, COL_TENANT, colRegion,
colRegionCode, COL_VALUE),
+ null, null, false, 0, false
+ ))
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withTaskDuration(Period.millis(500))
+ .withStartDelay(Period.millis(10))
+ .withSupervisorRunPeriod(Period.millis(500))
+ .withCompletionTimeout(Period.seconds(5))
+ .withUseEarliestSequenceNumber(true)
+ );
+
+ cluster.callApi().postSupervisor(multiDimBuilder.withId(dataSource +
"_supe_a").build(dataSource, topicA));
+ cluster.callApi().postSupervisor(multiDimBuilder.withId(dataSource +
"_supe_b").build(dataSource, topicB));
+
+ awaitRowsProcessed(15); // 10 from A + 5 from B
+
+ // Total row count
+ Assertions.assertEquals("15", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+
+ // String dimension — multiple values per topic
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = 'tenant_a'", dataSource,
COL_TENANT));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = 'tenant_x'", dataSource,
COL_TENANT));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = 'tenant_b'", dataSource,
COL_TENANT));
+
+ // Numeric dimension equality filter still returns correct counts. Note
this does NOT prune (numeric filters
+ // opt out of segment pruning); pruning behavior is asserted in
test_numericDimension_isNotPruned.
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = 1", dataSource, colRegionCode));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = 2", dataSource, colRegionCode));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = 3", dataSource, colRegionCode));
+
+ // Suspend, publish, and verify historical queries give the same counts.
+ suspendAndAwaitHandoff(multiDimBuilder.withId(dataSource +
"_supe_a").build(dataSource, topicA), 1);
+ suspendAndAwaitHandoff(multiDimBuilder.withId(dataSource +
"_supe_b").build(dataSource, topicB), 2);
+
+ Assertions.assertEquals("15", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = 1", dataSource, colRegionCode));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = 'tenant_b'", dataSource,
COL_TENANT));
+
+ // Verify sys.segments: all published segments carry dim_value_set shard
specs with both tracked dims
+ verifyAllSegmentsHaveDimensionValueSetShardSpec(dataSource);
+ final List<Map<String, Object>> shardSpecs = getShardSpecs(dataSource);
+ for (Map<String, Object> spec : shardSpecs) {
+ @SuppressWarnings("unchecked")
+ final Map<String, List<String>> filters = (Map<String, List<String>>)
spec.get("partitionDimensionValues");
+ Assertions.assertTrue(
+ filters.containsKey(COL_TENANT) ||
filters.containsKey(colRegionCode),
+ "Expected at least one tracked dimension in
partitionDimensionValues: " + filters
+ );
+ }
+ }
+
+ /**
+ * Numeric (Long) tracked dimensions are recorded in the shard spec but are
NOT used for pruning: numeric query
+ * filters opt out of segment pruning (their getDimensionRangeSet returns
null, since pruning compares values
+ * lexicographically), so a numeric equality filter scans every segment even
though each segment declares exactly
+ * one numeric value. Queries stay correct; there is simply no pruning
benefit.
+ *
+ * <p>This is intentional for now. We could either consider extending
pruning to numeric types with type-aware
+ * (non-lexicographic) comparison, or (b) reject numeric dimensions outright
when they're declared.
+ */
+ @Test
+ public void test_numericDimension_isNotPruned()
+ {
+ final String colCode = "code"; // numeric (Long), tracked
+ final String topic = dataSource + "_topic";
+ kafkaServer.createTopicWithPartitions(topic, 1);
+
+ // One distinct numeric code per DAY segment: Day1=1, Day2=2, Day3=3. If
numeric pruning worked, "code = 1" would
+ // scan only Day1; because it does NOT, all three segments are scanned.
+ final List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ records.add(record(topic, "%s,1,val_0",
DateTimes.of("2025-01-01T01:00:00")));
+ records.add(record(topic, "%s,1,val_1",
DateTimes.of("2025-01-01T02:00:00")));
+ records.add(record(topic, "%s,2,val_2",
DateTimes.of("2025-01-02T01:00:00")));
+ records.add(record(topic, "%s,2,val_3",
DateTimes.of("2025-01-02T02:00:00")));
+ records.add(record(topic, "%s,3,val_4",
DateTimes.of("2025-01-03T01:00:00")));
+ records.add(record(topic, "%s,3,val_5",
DateTimes.of("2025-01-03T02:00:00")));
+ kafkaServer.produceRecordsToTopic(records);
+
+ final KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+ .withContext(Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS,
true))
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null))
+ .withDimensions(
+ DimensionsSpec.builder()
+ .setDimensions(List.of(
+ new LongDimensionSchema(colCode),
+ new StringDimensionSchema(COL_VALUE)
+ ))
+ .build()
+ )
+ .withGranularity(new UniformGranularitySpec(Granularities.DAY,
Granularities.NONE, null))
+ )
+ .withTuningConfig(
+ t -> t.withMaxRowsPerSegment(1000)
+ .withReleaseLocksOnHandoff(true)
+ .withStreamingPartitionsSpec(new
StreamingPartitionsSpec(List.of(colCode)))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(new CsvInputFormat(
+ List.of(COL_TIMESTAMP, colCode, COL_VALUE), null, null,
false, 0, false))
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withTaskDuration(Period.millis(500))
+ .withStartDelay(Period.millis(10))
+ .withSupervisorRunPeriod(Period.millis(500))
+ .withCompletionTimeout(Period.seconds(5))
+ .withUseEarliestSequenceNumber(true)
+ )
+ .withId(dataSource + "_supe")
+ .build(dataSource, topic);
+
+ cluster.callApi().postSupervisor(spec);
+ awaitRowsProcessed(6);
+ suspendAndAwaitHandoff(spec, 1);
+
+ // The numeric value IS recorded in the shard spec (stringified), even
though it is never used for pruning.
+ verifyAllSegmentsHaveDimensionValueSetShardSpec(dataSource);
+
+ final Map<String, String> startToSegmentId =
getStartToSegmentId(dataSource);
+ final String day1 = startToSegmentId.get("2025-01-01T00:00:00.000Z");
+ final String day2 = startToSegmentId.get("2025-01-02T00:00:00.000Z");
+ final String day3 = startToSegmentId.get("2025-01-03T00:00:00.000Z");
+ final Set<String> allDays = Set.of(day1, day2, day3);
+
+ // Correct count, but ALL segments scanned: numeric equality does not
prune even though each segment holds one code.
+ assertScan("2", allDays, "SELECT COUNT(*) FROM %s WHERE %s = 1",
dataSource, colCode);
+ assertScan("2", allDays, "SELECT COUNT(*) FROM %s WHERE %s = 3",
dataSource, colCode);
+ // A non-existent code returns 0 rows but is still NOT pruned (would be a
full prune if numeric pruning worked).
+ assertScan("0", allDays, "SELECT COUNT(*) FROM %s WHERE %s = 999",
dataSource, colCode);
+
+ // Strict correctness: the RIGHT rows survive, not just the right count.
+ assertValues(Set.of("val_0", "val_1"), "SELECT \"%s\" FROM %s WHERE %s =
1", COL_VALUE, dataSource, colCode);
+ assertValues(Set.of("val_4", "val_5"), "SELECT \"%s\" FROM %s WHERE %s =
3", COL_VALUE, dataSource, colCode);
+ }
+
+ @Test
+ public void test_incorrectValuesInTopic_inMemory_handledGracefully()
+ {
+ final String topic = dataSource + "_topic";
+ kafkaServer.createTopicWithPartitions(topic, 1);
+
+ // Mixed rows on a single topic — tenant_a and tenant_b both present
(simulates producer bug
+ // or single-topic multi-tenant setup).
+ final List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ for (int i = 0; i < 5; i++) {
+ records.add(record(topic, "%s,tenant_a,val_%d",
DateTimes.of("2025-01-01").plusHours(i), i));
+ }
+ for (int i = 0; i < 5; i++) {
+ records.add(record(topic, "%s,tenant_b,val_%d",
DateTimes.of("2025-01-01").plusHours(i + 5), i + 100));
+ }
+ kafkaServer.produceRecordsToTopic(records);
+
+ // Use a long task duration and large segment thresholds so nothing gets
pushed to deep storage
+ // while we run the in-memory correctness assertions.
+ final KafkaSupervisorSpec spec = new KafkaSupervisorSpecBuilder()
+ .withContext(Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS,
true))
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null))
+ .withDimensions(
+ DimensionsSpec.builder()
+ .setDimensions(List.of(
+ new StringDimensionSchema(COL_TENANT),
+ new StringDimensionSchema(COL_VALUE)
+ ))
+ .build()
+ )
+ )
+ .withTuningConfig(
+ tuningConfig -> tuningConfig
+ // Large enough that 10 rows never trigger a segment push
+ .withMaxRowsPerSegment(100_000)
+ // Long enough that time-based persist/push doesn't fire
during the test
+ .withIntermediatePersistPeriod(Period.hours(1))
+ .withStreamingPartitionsSpec(new
StreamingPartitionsSpec(List.of(COL_TENANT)))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(new CsvInputFormat(
+ List.of(COL_TIMESTAMP, COL_TENANT, COL_VALUE),
+ null, null, false, 0, false
+ ))
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ // Long task duration — task stays alive and holds data in
memory
+ .withTaskDuration(Period.minutes(5))
+ .withStartDelay(Period.millis(10))
+ .withSupervisorRunPeriod(Period.millis(500))
+ .withCompletionTimeout(Period.seconds(30))
+ .withUseEarliestSequenceNumber(true)
+ )
+ .withId(dataSource + "_supe")
+ .build(dataSource, topic);
+
+ cluster.callApi().postSupervisor(spec);
+ awaitRowsProcessed(10);
+
+ // In-memory correctness: broker fans out to the task (NumberedShardSpec,
no pruning), all rows returned.
+ assertMixedTenantCounts();
+
+ suspendAndAwaitHandoff(spec, 1);
+
+ // After publish the DimensionValueSetShardSpec carries both observed
tenants; historical queries stay correct.
+ assertMixedTenantCounts();
+
+ // Verify sys.segments: published segments carry dim_value_set with both
tenant values
+ verifyAllSegmentsHaveDimensionValueSetShardSpec(dataSource);
+ final List<Map<String, Object>> publishedShardSpecs =
getShardSpecs(dataSource);
+ for (Map<String, Object> shardSpec : publishedShardSpecs) {
+ @SuppressWarnings("unchecked")
+ final List<String> tenantValues = ((Map<String, List<String>>)
shardSpec.get("partitionDimensionValues")).get(COL_TENANT);
+ Assertions.assertEquals(List.of(TENANT_A, TENANT_B), tenantValues,
"Expected tenant dimension in partitionDimensionValues");
+ }
+ }
+
+ /**
+ * One tenant per DAY segment: each segment's {@code
partitionDimensionValues} must contain ONLY that segment's tenant, not
+ * the union of all tenants the task saw.
+ */
+ @Test
+ public void test_perSegmentTracking_shardSpecContainsOnlySegmentValues()
+ {
+ final String topic = dataSource + "_topic";
+ kafkaServer.createTopicWithPartitions(topic, 1);
+
+ final String tenantC = "tenant_c";
+ final String tenantD = "tenant_d";
+
+ // 4 tenants × 2 rows each = 8 rows, one tenant per day → 4 segments (DAY
granularity).
+ final List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ records.add(record(topic, "%s,tenant_a,val_0",
DateTimes.of("2025-01-01T01:00:00")));
+ records.add(record(topic, "%s,tenant_a,val_1",
DateTimes.of("2025-01-01T02:00:00")));
+ records.add(record(topic, "%s,tenant_b,val_2",
DateTimes.of("2025-01-02T01:00:00")));
+ records.add(record(topic, "%s,tenant_b,val_3",
DateTimes.of("2025-01-02T02:00:00")));
+ records.add(record(topic, "%s,tenant_c,val_4",
DateTimes.of("2025-01-03T01:00:00")));
+ records.add(record(topic, "%s,tenant_c,val_5",
DateTimes.of("2025-01-03T02:00:00")));
+ records.add(record(topic, "%s,tenant_d,val_6",
DateTimes.of("2025-01-04T01:00:00")));
+ records.add(record(topic, "%s,tenant_d,val_7",
DateTimes.of("2025-01-04T02:00:00")));
+ kafkaServer.produceRecordsToTopic(records);
+
+ final KafkaSupervisorSpec spec = dayGranularitySupervisor(topic,
List.of(COL_TENANT));
+ cluster.callApi().postSupervisor(spec);
+ awaitRowsProcessed(8);
+ suspendAndAwaitHandoff(spec, 1);
+
+ // Basic correctness: all 8 rows are queryable
+ Assertions.assertEquals("8", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+ Assertions.assertEquals("2", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = '%s'", dataSource, COL_TENANT,
TENANT_A));
+ Assertions.assertEquals("2", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = '%s'", dataSource, COL_TENANT,
tenantC));
+
+ // Key assertion: each segment carries ONLY the tenant value(s) observed
in that segment,
+ // NOT the union of all values seen by the task.
+ verifyAllSegmentsHaveDimensionValueSetShardSpec(dataSource);
+ final List<Map<String, Object>> shardSpecs = getShardSpecs(dataSource);
+ Assertions.assertEquals(4, shardSpecs.size(),
+ "Expected exactly 4 segments (DAY granularity, 4 days) but got " +
shardSpecs.size());
+
+ for (Map<String, Object> shardSpec : shardSpecs) {
+ @SuppressWarnings("unchecked")
+ final List<String> tenantValues =
+ ((Map<String, List<String>>)
shardSpec.get("partitionDimensionValues")).get(COL_TENANT);
+ Assertions.assertNotNull(tenantValues, "Expected tenant in
partitionDimensionValues: " + shardSpec);
+ // Each segment should have exactly 1 tenant (one day = one tenant)
+ Assertions.assertEquals(
+ 1,
+ tenantValues.size(),
+ StringUtils.format(
+ "Expected exactly 1 tenant per segment's
partitionDimensionValues but got %s",
+ tenantValues
+ )
+ );
+ }
+
+ // Collect all tenants across all segments — the union should still cover
all 4
+ final Set<String> allTenants = shardSpecs.stream()
+ .map(s -> {
+ @SuppressWarnings("unchecked")
+ final List<String> vals = ((Map<String, List<String>>)
s.get("partitionDimensionValues")).get(COL_TENANT);
+ return vals;
+ })
+ .flatMap(List::stream)
+ .collect(Collectors.toSet());
+ Assertions.assertEquals(
+ Set.of(TENANT_A, TENANT_B, tenantC, tenantD),
+ allTenants,
+ "Union of all segments' partitionDimensionValues should cover all 4
tenants"
+ );
+ }
+
+ @Test
+ public void test_pruning_verifiedBySegmentScanMetric()
+ {
+ final String topic = dataSource + "_topic";
+ kafkaServer.createTopicWithPartitions(topic, 1);
+
+ // Tenant-per-DAY layout, 5 day-segments, 10 rows. Day3 repeats tenant_a
(two-day match);
+ // Day4 is multi-value {tenant_c,tenant_d}; Day5 is null tenant (empty CSV
field).
+ final List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ records.add(record(topic, "%s,tenant_a,val_0",
DateTimes.of("2025-01-01T01:00:00")));
+ records.add(record(topic, "%s,tenant_a,val_1",
DateTimes.of("2025-01-01T02:00:00")));
+ records.add(record(topic, "%s,tenant_b,val_2",
DateTimes.of("2025-01-02T01:00:00")));
+ records.add(record(topic, "%s,tenant_b,val_3",
DateTimes.of("2025-01-02T02:00:00")));
+ records.add(record(topic, "%s,tenant_a,val_4",
DateTimes.of("2025-01-03T01:00:00")));
+ records.add(record(topic, "%s,tenant_a,val_5",
DateTimes.of("2025-01-03T02:00:00")));
+ records.add(record(topic, "%s,tenant_c,val_6",
DateTimes.of("2025-01-04T01:00:00")));
+ records.add(record(topic, "%s,tenant_d,val_7",
DateTimes.of("2025-01-04T02:00:00")));
+ records.add(record(topic, "%s,,val_8",
DateTimes.of("2025-01-05T01:00:00")));
+ records.add(record(topic, "%s,,val_9",
DateTimes.of("2025-01-05T02:00:00")));
+ kafkaServer.produceRecordsToTopic(records);
+
+ // DAY granularity, maxRowsPerSegment large so Day4 stays ONE multi-value
segment.
+ final KafkaSupervisorSpec spec = dayGranularitySupervisor(topic,
List.of(COL_TENANT));
+ cluster.callApi().postSupervisor(spec);
+ awaitRowsProcessed(10);
+ suspendAndAwaitHandoff(spec, 1); // publish + hand off all 5 segments to
the historical
+
+ // Sanity: exactly 5 published segments, all dim_value_set.
+ verifyAllSegmentsHaveDimensionValueSetShardSpec(dataSource);
+
+ // Build day(startIso) → segmentId map from sys.segments. The segment
version is assigned at publish
+ // time, so the expected segment ids MUST be read here and never
hand-constructed.
+ final Map<String, String> startToSegmentId =
getStartToSegmentId(dataSource);
+ Assertions.assertEquals(
+ 5,
+ startToSegmentId.size(),
+ "Expected exactly 5 day segments (1:1 start→segment) but got " +
startToSegmentId
+ );
+
+ final String day1 = startToSegmentId.get("2025-01-01T00:00:00.000Z"); //
tenant_a
+ final String day2 = startToSegmentId.get("2025-01-02T00:00:00.000Z"); //
tenant_b
+ final String day3 = startToSegmentId.get("2025-01-03T00:00:00.000Z"); //
tenant_a
+ final String day4 = startToSegmentId.get("2025-01-04T00:00:00.000Z"); //
{tenant_c,tenant_d}
+ final String day5 = startToSegmentId.get("2025-01-05T00:00:00.000Z"); //
null
+ for (String id : List.of(day1, day2, day3, day4, day5)) {
+ Assertions.assertNotNull(id, "Missing day segment id in: " +
startToSegmentId);
+ }
+
+ // Positive control: no tenant filter → no pruning → all 5 scanned (proves
the harness sees the full set).
+ assertScan("10", Set.of(day1, day2, day3, day4, day5), "SELECT COUNT(*)
FROM %s", dataSource);
+
+ // '=' matches both tenant_a days.
+ assertScan("4", Set.of(day1, day3),
+ "SELECT COUNT(*) FROM %s WHERE %s = 'tenant_a'", dataSource,
COL_TENANT);
+
+ // 'IN' matches tenant_a days plus the multi-value Day4 (via tenant_c).
+ assertScan("5", Set.of(day1, day3, day4),
+ "SELECT COUNT(*) FROM %s WHERE %s IN ('tenant_a','tenant_c')",
dataSource, COL_TENANT);
+
+ // 'IS NULL' (domain (-inf,"")) matches only the null Day5.
+ assertScan("2", Set.of(day5),
+ "SELECT COUNT(*) FROM %s WHERE %s IS NULL", dataSource,
COL_TENANT);
+
+ // 'IS NOT NULL' (complement ["",+inf)) prunes the null Day5.
+ assertScan("8", Set.of(day1, day2, day3, day4),
+ "SELECT COUNT(*) FROM %s WHERE %s IS NOT NULL", dataSource,
COL_TENANT);
+
+ // '<>' scans everything except the tenant_a days. Scan≠count: the null
Day5 is scanned (its null falls in the
+ // complement (-inf,'tenant_a')) but contributes 0 rows under three-valued
logic, so count=4.
+ assertScan("4", Set.of(day2, day4, day5),
+ "SELECT COUNT(*) FROM %s WHERE %s <> 'tenant_a'", dataSource,
COL_TENANT);
+
+ // 'NOT IN' all non-null values: only the null Day5 survives the
complement; count=0 (nulls dropped under 3VL).
+ assertScan("0", Set.of(day5),
+ "SELECT COUNT(*) FROM %s WHERE %s NOT IN
('tenant_a','tenant_b','tenant_c','tenant_d')",
+ dataSource, COL_TENANT);
+
+ // Untracked dimension: pruning is scoped to 'tenant' (getDomainDimensions
returns only it), so a 'value' filter
+ // prunes nothing — all 5 scanned, filter still applied (1 row). "value"
is reserved in Calcite, hence quoted.
+ assertScan("1", Set.of(day1, day2, day3, day4, day5),
+ "SELECT COUNT(*) FROM %s WHERE \"%s\" = 'val_0'", dataSource,
COL_VALUE);
+
+ // Strict correctness on some predicates: assert the actual surviving
rows, not just their count.
+ // 'IN' spans tenant_a (val_0,val_1,val_4,val_5) plus tenant_c on the
multi-value Day4 (val_6) — but NOT tenant_d's
+ // val_7, even though val_7 lives in the same Day4 segment. This is what
COUNT alone cannot prove.
+ assertValues(Set.of("val_0", "val_1", "val_4", "val_5", "val_6"),
+ "SELECT \"%s\" FROM %s WHERE %s IN ('tenant_a','tenant_c')",
COL_VALUE, dataSource, COL_TENANT);
+ // 'IS NULL' returns only the two null-tenant Day5 rows.
+ assertValues(Set.of("val_8", "val_9"),
+ "SELECT \"%s\" FROM %s WHERE %s IS NULL", COL_VALUE,
dataSource, COL_TENANT);
+
+ // Non-existent value → full prune (no segment reaches the historical).
Verified via the sentinel fence.
+ assertFullPruneWithSentinel(
+ StringUtils.format("SELECT COUNT(*) FROM %s WHERE %s = 'tenant_zzz'",
dataSource, COL_TENANT),
+ StringUtils.format("SELECT COUNT(*) FROM %s WHERE %s = 'tenant_b'",
dataSource, COL_TENANT),
+ Set.of(day2)
+ );
+ }
+
+ @Test
+ public void test_noPartitioning_scansAllSegments_unlikePruned()
+ {
+ final String topic = dataSource + "_topic";
+ kafkaServer.createTopicWithPartitions(topic, 1);
+
+ // IDENTICAL layout to test_pruning_verifiedBySegmentScanMetric: 5
day-segments, 10 rows.
+ final List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ records.add(record(topic, "%s,tenant_a,val_0",
DateTimes.of("2025-01-01T01:00:00")));
+ records.add(record(topic, "%s,tenant_a,val_1",
DateTimes.of("2025-01-01T02:00:00")));
+ records.add(record(topic, "%s,tenant_b,val_2",
DateTimes.of("2025-01-02T01:00:00")));
+ records.add(record(topic, "%s,tenant_b,val_3",
DateTimes.of("2025-01-02T02:00:00")));
+ records.add(record(topic, "%s,tenant_a,val_4",
DateTimes.of("2025-01-03T01:00:00")));
+ records.add(record(topic, "%s,tenant_a,val_5",
DateTimes.of("2025-01-03T02:00:00")));
+ records.add(record(topic, "%s,tenant_c,val_6",
DateTimes.of("2025-01-04T01:00:00")));
+ records.add(record(topic, "%s,tenant_d,val_7",
DateTimes.of("2025-01-04T02:00:00")));
+ records.add(record(topic, "%s,,val_8",
DateTimes.of("2025-01-05T01:00:00")));
+ records.add(record(topic, "%s,,val_9",
DateTimes.of("2025-01-05T02:00:00")));
+ kafkaServer.produceRecordsToTopic(records);
+
+ // Same layout as the partitioned test but with NO streamingPartitionsSpec
→ plain NumberedShardSpec, no pruning.
+ final KafkaSupervisorSpec spec = dayGranularitySupervisor(topic,
List.of());
+ cluster.callApi().postSupervisor(spec);
+ awaitRowsProcessed(10);
+ suspendAndAwaitHandoff(spec, 1);
+
+ // Partitioning is OFF: every published segment must be a plain "numbered"
shard spec, NOT "dim_value_set".
+ verifyAllSegmentsHaveShardSpecType(dataSource, "numbered");
+
+ final Map<String, String> startToSegmentId =
getStartToSegmentId(dataSource);
+ Assertions.assertEquals(
+ 5,
+ startToSegmentId.size(),
+ "Expected exactly 5 day segments (1:1 start→segment) but got " +
startToSegmentId
+ );
+ // Day-position → segment id, so pruned day-sets from the partitioned test
can be compared across datasources.
+ final String day1 = startToSegmentId.get("2025-01-01T00:00:00.000Z");
+ final String day2 = startToSegmentId.get("2025-01-02T00:00:00.000Z");
+ final String day3 = startToSegmentId.get("2025-01-03T00:00:00.000Z");
+ final String day4 = startToSegmentId.get("2025-01-04T00:00:00.000Z");
+ final String day5 = startToSegmentId.get("2025-01-05T00:00:00.000Z");
+ final Set<String> allSegments = Set.of(day1, day2, day3, day4, day5);
+ Assertions.assertEquals(5, allSegments.size(), "Expected 5 distinct
segment ids: " + startToSegmentId);
+
+ // Same matrix as the partitioned test; arg 3 is the partitioned twin's
pruned day-set for each predicate.
+ assertScansAllStrictlyMoreThanPruned("10", allSegments, Set.of(day1, day2,
day3, day4, day5),
+ "SELECT COUNT(*) FROM %s", dataSource);
+ assertScansAllStrictlyMoreThanPruned("4", allSegments, Set.of(day1, day3),
+ "SELECT COUNT(*) FROM %s WHERE %s = 'tenant_a'",
dataSource, COL_TENANT);
+ assertScansAllStrictlyMoreThanPruned("5", allSegments, Set.of(day1, day3,
day4),
+ "SELECT COUNT(*) FROM %s WHERE %s IN
('tenant_a','tenant_c')", dataSource, COL_TENANT);
+ assertScansAllStrictlyMoreThanPruned("2", allSegments, Set.of(day5),
+ "SELECT COUNT(*) FROM %s WHERE %s IS NULL", dataSource,
COL_TENANT);
+ assertScansAllStrictlyMoreThanPruned("8", allSegments, Set.of(day1, day2,
day3, day4),
+ "SELECT COUNT(*) FROM %s WHERE %s IS NOT NULL", dataSource,
COL_TENANT);
+ assertScansAllStrictlyMoreThanPruned("4", allSegments, Set.of(day2, day4,
day5),
+ "SELECT COUNT(*) FROM %s WHERE %s <> 'tenant_a'",
dataSource, COL_TENANT);
+ assertScansAllStrictlyMoreThanPruned("0", allSegments, Set.of(day5),
+ "SELECT COUNT(*) FROM %s WHERE %s NOT IN
('tenant_a','tenant_b','tenant_c','tenant_d')",
+ dataSource, COL_TENANT);
+ // Untracked dimension: neither test prunes on 'value', so the pruned-set
equals the full set. "value" is quoted
+ // (reserved in Calcite).
+ assertScansAllStrictlyMoreThanPruned("1", allSegments, allSegments,
+ "SELECT COUNT(*) FROM %s WHERE \"%s\" = 'val_0'",
dataSource, COL_VALUE);
+ // Sharpest contrast: partitioned prunes this to ZERO segments;
unpartitioned still scans all 5.
+ assertScansAllStrictlyMoreThanPruned("0", allSegments, Set.of(),
+ "SELECT COUNT(*) FROM %s WHERE %s = 'tenant_zzz'",
dataSource, COL_TENANT);
+ }
+
+ /**
+ * No-partitioning-twin assertion: asserts COUNT, then (after the {@link
#assertScan} barrier) that all segments were
+ * scanned, and that {@code partitionedPrunedSet} is a proper subset of the
full set (or equal, for the no-prune
+ * control/untracked cases) — i.e. partitioning scans strictly fewer
segments.
+ */
+ private void assertScansAllStrictlyMoreThanPruned(
+ String expectedCount,
+ Set<String> expectedAll,
+ Set<String> partitionedPrunedSet,
+ String sqlTemplate,
+ Object... sqlArgs
+ )
+ {
+ final String sql = StringUtils.format(sqlTemplate, sqlArgs);
+
+ historical.latchableEmitter().flush();
+ Assertions.assertEquals(expectedCount, cluster.runSql(sqlTemplate,
sqlArgs), "Wrong COUNT for: " + sql);
+ historical.latchableEmitter().waitForEvent(
+ m -> m.hasMetricName("query/time")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+ );
+ Assertions.assertEquals(
+ expectedAll,
+ scannedSegments(dataSource),
+ "Without partitioning every predicate must scan ALL segments (no
pruning) for: " + sql
+ );
+
+ // Partitioned pruned-set must be a subset of the full set, and strictly
smaller for every prunable predicate.
+ Assertions.assertTrue(
+ expectedAll.containsAll(partitionedPrunedSet),
+ "Partitioned pruned-set must be a subset of the full segment set for:
" + sql
+ + " pruned=" + partitionedPrunedSet + " all=" + expectedAll
+ );
+ if (!partitionedPrunedSet.equals(expectedAll)) {
+ Assertions.assertTrue(
+ partitionedPrunedSet.size() < expectedAll.size(),
+ "Partitioning must scan strictly fewer segments than no-partitioning
for: " + sql
+ + " pruned=" + partitionedPrunedSet + " all=" + expectedAll
+ );
+ }
+ }
+
+
+ private void assertScan(String expectedCount, Set<String> expectedScanned,
String sqlTemplate, Object... sqlArgs)
+ {
+ historical.latchableEmitter().flush();
+ Assertions.assertEquals(
+ expectedCount,
+ cluster.runSql(sqlTemplate, sqlArgs),
+ "Wrong COUNT for: " + StringUtils.format(sqlTemplate, sqlArgs)
+ );
+ historical.latchableEmitter().waitForEvent(
+ m -> m.hasMetricName("query/time")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+ );
+ Assertions.assertEquals(
+ expectedScanned,
+ scannedSegments(dataSource),
+ "Wrong scanned-segment set for: " + StringUtils.format(sqlTemplate,
sqlArgs)
+ );
+ }
+
+ private void assertValues(Set<String> expectedValues, String sqlTemplate,
Object... sqlArgs)
+ {
+ final String csv = cluster.runSql(sqlTemplate, sqlArgs);
+ final Set<String> actual = Arrays.stream(csv.split("\n"))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toSet());
+ Assertions.assertEquals(expectedValues, actual, "Wrong value set for: " +
StringUtils.format(sqlTemplate, sqlArgs));
+ }
+
+ private void assertFullPruneWithSentinel(String prunedSql, String
sentinelSql, Set<String> sentinelExpected)
+ {
+ historical.latchableEmitter().flush();
+ // Pruned query must return 0 and (if pruning is correct) contact zero
historical segments.
+ Assertions.assertEquals("0", cluster.runSql(prunedSql), "Full-prune query
should return 0: " + prunedSql);
+ // Sentinel: scans a deterministic, non-empty historical set; its
query/time fence transitively fences any
+ // (erroneous) per-segment emit produced by the pruned query above (same
historical, same emitter).
+ cluster.runSql(sentinelSql);
+ historical.latchableEmitter().waitForEvent(
+ m -> m.hasMetricName("query/time")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource)
+ );
+ // If the pruned query wrongly scanned any segment, its id appears here as
an EXTRA element → equality fails.
+ Assertions.assertEquals(
+ sentinelExpected,
+ scannedSegments(dataSource),
+ "Full-prune query must scan zero segments; only the sentinel's
segments may appear. pruned=" + prunedSql
+ );
+ }
+
+ /**
+ * Reads {@code sys.segments} for the datasource and returns a map of
segment start time (ISO) → segment id,
+ * over the currently available, non-overshadowed segments. With one tenant
per DAY this is a 1:1 mapping.
+ */
+ private Map<String, String> getStartToSegmentId(String ds)
+ {
+ final String csv = cluster.runSql(
+ "SELECT segment_id, \"start\" FROM sys.segments"
+ + " WHERE datasource = '%s' AND is_overshadowed = 0 AND is_available =
1",
+ ds
+ );
+ final Map<String, String> startToSegmentId = new HashMap<>();
+ for (String line : csv.split("\n")) {
+ final String trimmed = line.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ // CSV columns: segment_id,start. The segment id uses underscores (never
commas), so split into 2.
+ final String[] cols = trimmed.split(",", 2);
+ Assertions.assertEquals(2, cols.length, "Unexpected sys.segments CSV
row: " + trimmed);
+ startToSegmentId.put(cols[1].trim(), cols[0].trim());
+ }
+ Assertions.assertFalse(startToSegmentId.isEmpty(), "Expected at least one
segment in sys.segments for " + ds);
+ return startToSegmentId;
+ }
+
+ private Set<String> scannedSegments(String ds)
+ {
+ return historical.latchableEmitter()
+ .getMetricEvents("query/segment/time")
+ .stream()
+ .filter(e -> ds.equals(e.getUserDims().get(DruidMetrics.DATASOURCE)))
+ .map(e -> (String) e.getUserDims().get("segment"))
+ .collect(Collectors.toSet());
+ }
+
+
+ @Test
+ public void test_nullValuedDimension_isNullQueryNotPruned()
+ {
+ final String topic = dataSource + "_topic";
+ kafkaServer.createTopicWithPartitions(topic, 1);
+
+ final List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ // 5 rows with tenant_a
+ for (int i = 0; i < 5; i++) {
+ records.add(record(topic, "%s,tenant_a,val_%d",
DateTimes.of("2025-01-01").plusHours(i), i));
+ }
+ // 5 rows with an empty tenant field — CsvInputFormat parses the empty
column as a null dimension value
+ for (int i = 0; i < 5; i++) {
+ records.add(record(topic, "%s,,val_%d",
DateTimes.of("2025-01-01").plusHours(i + 5), i + 100));
+ }
+ kafkaServer.produceRecordsToTopic(records);
+
+ final KafkaSupervisorSpec spec = newSupervisorBuilder().withId(dataSource
+ "_supe").build(dataSource, topic);
+ cluster.callApi().postSupervisor(spec);
+ awaitRowsProcessed(10);
+
+ // Realtime correctness (5 tenant_a, 5 IS NULL).
+ assertNullTenantCounts();
+
+ suspendAndAwaitHandoff(spec, 1);
+
+ // Historical: the IS NULL query must still find the 5 null rows. If the
DimensionValueSetShardSpec failed to declare
+ // null, the broker would prune the segment here and return 0.
+ assertNullTenantCounts();
+
+ // The published shard spec declares the null value (serialized as a JSON
null in the tenant list).
+ verifyAllSegmentsHaveDimensionValueSetShardSpec(dataSource);
+ final List<Map<String, Object>> shardSpecs = getShardSpecs(dataSource);
+ final boolean someSpecDeclaresNull = shardSpecs.stream()
+ .map(shardSpec -> {
+ @SuppressWarnings("unchecked")
+ final List<String> vals = ((Map<String, List<String>>)
shardSpec.get("partitionDimensionValues")).get(COL_TENANT);
+ return vals;
+ })
+ .anyMatch(vals -> vals != null && vals.contains(null));
+ Assertions.assertTrue(
+ someSpecDeclaresNull,
+ "Expected at least one segment's partitionDimensionValues to declare a
null tenant value: " + shardSpecs
+ );
+ }
+
+ private ProducerRecord<byte[], byte[]> record(String topic, String
csvTemplate, Object... args)
+ {
+ return new ProducerRecord<>(topic, 0, null,
StringUtils.toUtf8(StringUtils.format(csvTemplate, args)));
+ }
+
+ private void assertRowCounts()
+ {
+ Assertions.assertEquals(
+ String.valueOf(ROWS_PER_TENANT * 2), cluster.runSql("SELECT COUNT(*)
FROM %s", dataSource));
+ Assertions.assertEquals(
+ String.valueOf(ROWS_PER_TENANT),
+ cluster.runSql("SELECT COUNT(*) FROM %s WHERE %s = '%s'", dataSource,
COL_TENANT, TENANT_A));
+ Assertions.assertEquals(
+ String.valueOf(ROWS_PER_TENANT),
+ cluster.runSql("SELECT COUNT(*) FROM %s WHERE %s = '%s'", dataSource,
COL_TENANT, TENANT_B));
+ }
+
+ /** Asserts 10 total rows split 5/5 between tenant_a and tenant_b (the
mixed-tenant single-segment layout). */
+ private void assertMixedTenantCounts()
+ {
+ Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = '%s'", dataSource, COL_TENANT,
TENANT_A));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = '%s'", dataSource, COL_TENANT,
TENANT_B));
+ }
+
+ private void assertNullTenantCounts()
+ {
+ Assertions.assertEquals("10", cluster.runSql("SELECT COUNT(*) FROM %s",
dataSource));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s = '%s'", dataSource, COL_TENANT,
TENANT_A));
+ Assertions.assertEquals("5", cluster.runSql(
+ "SELECT COUNT(*) FROM %s WHERE %s IS NULL", dataSource, COL_TENANT));
+ }
+
+ private void awaitRowsProcessed(long rows)
+ {
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/events/processed")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(rows)
+ );
+ }
+
+ private void suspendAndAwaitHandoff(KafkaSupervisorSpec spec, long handoffs)
+ {
+ cluster.callApi().postSupervisor(spec.createSuspendedSpec());
+ indexer.latchableEmitter().waitForEventAggregate(
+ event -> event.hasMetricName("ingest/handoff/count")
+ .hasDimension(DruidMetrics.DATASOURCE, dataSource),
+ agg -> agg.hasSumAtLeast(handoffs)
+ );
+ cluster.callApi().waitForAllSegmentsToBeAvailable(dataSource, coordinator,
broker);
+ }
+
+ private KafkaSupervisorSpecBuilder newSupervisorBuilder()
+ {
+ return new KafkaSupervisorSpecBuilder()
+ .withContext(Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS,
true))
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null))
+ .withDimensions(
+ DimensionsSpec.builder()
+ .setDimensions(List.of(
+ new StringDimensionSchema(COL_TENANT),
+ new StringDimensionSchema(COL_VALUE)
+ ))
+ .build()
+ )
+ )
+ .withTuningConfig(
+ tuningConfig -> tuningConfig
+ .withMaxRowsPerSegment(1)
+ .withReleaseLocksOnHandoff(true)
+ .withStreamingPartitionsSpec(new
StreamingPartitionsSpec(List.of(COL_TENANT)))
+ )
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(new CsvInputFormat(
+ List.of(COL_TIMESTAMP, COL_TENANT, COL_VALUE),
+ null, null, false, 0, false
+ ))
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withTaskDuration(Period.millis(500))
+ .withStartDelay(Period.millis(10))
+ .withSupervisorRunPeriod(Period.millis(500))
+ .withCompletionTimeout(Period.seconds(5))
+ .withUseEarliestSequenceNumber(true)
+ );
+ }
+
+ private KafkaSupervisorSpec dayGranularitySupervisor(String topic,
List<String> partitionFilterDims)
+ {
+ return new KafkaSupervisorSpecBuilder()
+ .withContext(Collections.singletonMap(Tasks.USE_CONCURRENT_LOCKS,
true))
+ .withDataSchema(
+ schema -> schema
+ .withTimestamp(new TimestampSpec(COL_TIMESTAMP, null, null))
+ .withDimensions(
+ DimensionsSpec.builder()
+ .setDimensions(List.of(
+ new StringDimensionSchema(COL_TENANT),
+ new StringDimensionSchema(COL_VALUE)
+ ))
+ .build()
+ )
+ .withGranularity(new UniformGranularitySpec(Granularities.DAY,
Granularities.NONE, null))
+ )
+ .withTuningConfig(t -> {
+ t.withMaxRowsPerSegment(1000).withReleaseLocksOnHandoff(true);
+ if (!partitionFilterDims.isEmpty()) {
+ t.withStreamingPartitionsSpec(new
StreamingPartitionsSpec(partitionFilterDims));
+ }
+ })
+ .withIoConfig(
+ ioConfig -> ioConfig
+ .withInputFormat(new CsvInputFormat(
+ List.of(COL_TIMESTAMP, COL_TENANT, COL_VALUE), null, null,
false, 0, false))
+ .withConsumerProperties(kafkaServer.consumerProperties())
+ .withTaskDuration(Period.millis(500))
+ .withStartDelay(Period.millis(10))
+ .withSupervisorRunPeriod(Period.millis(500))
+ .withCompletionTimeout(Period.seconds(5))
+ .withUseEarliestSequenceNumber(true)
+ )
+ .withId(dataSource + "_supe")
+ .build(dataSource, topic);
+ }
+
+ private List<ProducerRecord<byte[], byte[]>> generateRecords(
+ String topic,
+ String tenantValue,
+ int numRecords,
+ DateTime startTime
+ )
+ {
+ final List<ProducerRecord<byte[], byte[]>> records = new ArrayList<>();
+ for (int i = 0; i < numRecords; i++) {
+ final String csv = StringUtils.format(
+ "%s,%s,value_%d",
+ startTime.plusHours(i),
+ tenantValue,
+ i
+ );
+ records.add(new ProducerRecord<>(topic, 0, null,
StringUtils.toUtf8(csv)));
+ }
+ return records;
+ }
+
+ private void verifyAllSegmentsHaveDimensionValueSetShardSpec(String ds)
+ {
+ verifyAllSegmentsHaveShardSpecType(ds, "dim_value_set");
+ }
+
+ private void verifyAllSegmentsHaveShardSpecType(String ds, String
expectedType)
+ {
+ final List<Map<String, Object>> specs = getShardSpecs(ds);
+ for (Map<String, Object> spec : specs) {
+ Assertions.assertEquals(
+ expectedType,
+ spec.get("type"),
+ "Expected " + expectedType + " shard spec but got: " + spec
+ );
+ }
+ }
+
+ private List<Map<String, Object>> getShardSpecs(String ds)
+ {
+ final String csv = cluster.runSql(
+ "SELECT shard_spec FROM sys.segments"
+ + " WHERE datasource = '%s' AND is_overshadowed = 0 AND is_available =
1",
+ ds
+ );
+ final ObjectMapper mapper = new ObjectMapper();
+ final List<Map<String, Object>> specs = new ArrayList<>();
+ for (String line : csv.split("\n")) {
+ String trimmed = line.trim();
+ if (trimmed.isEmpty()) {
+ continue;
+ }
+ // runSql returns CSV-quoted JSON — strip surrounding quotes and
unescape doubled quotes
+ if (trimmed.startsWith("\"") && trimmed.endsWith("\"")) {
+ trimmed = StringUtils.replace(trimmed.substring(1, trimmed.length() -
1), "\"\"", "\"");
+ }
+ try {
+ specs.add(mapper.readValue(trimmed, new TypeReference<>() {}));
+ }
+ catch (Exception e) {
+ Assertions.fail("Failed to parse shard_spec JSON: " + trimmed + " — "
+ e.getMessage());
+ }
+ }
+ Assertions.assertFalse(specs.isEmpty(), "Expected at least one segment in
sys.segments for " + ds);
+ return specs;
+ }
+}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
index 44113519c9c..0d1bb127136 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/supervisor/KafkaTuningConfigBuilder.java
@@ -20,6 +20,7 @@
package org.apache.druid.indexing.kafka.supervisor;
import org.apache.druid.indexing.common.task.TuningConfigBuilder;
+import org.apache.druid.indexing.seekablestream.StreamingPartitionsSpec;
import org.joda.time.Period;
/**
@@ -35,6 +36,7 @@ public class KafkaTuningConfigBuilder extends
TuningConfigBuilder<KafkaTuningCon
private Period offsetFetchPeriod;
private Period intermediateHandoffPeriod;
private Boolean releaseLocksOnHandoff;
+ private StreamingPartitionsSpec streamingPartitionsSpec;
public KafkaTuningConfigBuilder withIntermediatePersistPeriod(Period
intermediatePersistPeriod)
{
@@ -84,6 +86,12 @@ public class KafkaTuningConfigBuilder extends
TuningConfigBuilder<KafkaTuningCon
return this;
}
+ public KafkaTuningConfigBuilder
withStreamingPartitionsSpec(StreamingPartitionsSpec streamingPartitionsSpec)
+ {
+ this.streamingPartitionsSpec = streamingPartitionsSpec;
+ return this;
+ }
+
@Override
public KafkaSupervisorTuningConfig build()
{
@@ -113,7 +121,8 @@ public class KafkaTuningConfigBuilder extends
TuningConfigBuilder<KafkaTuningCon
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
- releaseLocksOnHandoff
+ releaseLocksOnHandoff,
+ streamingPartitionsSpec
);
}
}
diff --git
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
index 08a083fe112..3bbfc450dc3 100644
---
a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
+++
b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/indexing/kafka/test/TestModifiedKafkaIndexTaskTuningConfig.java
@@ -82,6 +82,7 @@ public class TestModifiedKafkaIndexTaskTuningConfig extends
KafkaIndexTaskTuning
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
+ null,
null
);
this.extra = extra;
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
index 7209146aaf5..f3d1a98727c 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunner.java
@@ -88,12 +88,15 @@ import org.apache.druid.segment.realtime.ChatHandler;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.segment.realtime.appenderator.Appenderator;
import
org.apache.druid.segment.realtime.appenderator.AppenderatorDriverAddResult;
+import org.apache.druid.segment.realtime.appenderator.SegmentIdWithShardSpec;
import
org.apache.druid.segment.realtime.appenderator.SegmentsAndCommitMetadata;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderator;
import org.apache.druid.segment.realtime.appenderator.StreamAppenderatorDriver;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.DimensionValueSetShardSpec;
import org.apache.druid.utils.CollectionUtils;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.DateTime;
@@ -115,9 +118,11 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
+import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -251,6 +256,23 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
private final Map<PartitionIdType, Long> partitionsThroughput = new
HashMap<>();
+ /**
+ * Observed values per tracked dimension, keyed by segment identifier, used
to stamp the {@link DimensionValueSetShardSpec}
+ * at publish time. A {@code null} element denotes an observed null/missing
value (distinct from {@code ""}) so that
+ * {@code IS NULL} queries are not pruned. Inner sets permit null and are
written by the run loop / read by the
+ * publish thread under their own monitor. Entries are cleared on successful
publish; a publish failure is terminal
+ * for the task, so any remaining entries are reclaimed at task teardown
rather than removed individually.
+ */
+ private final ConcurrentHashMap<SegmentId, Map<String, Set<String>>>
observedPartitionDimValuesBySegment = new ConcurrentHashMap<>();
+
+ /**
+ * Segment identifiers restored from disk at startup (i.e. spanning a task
restart). Their pre-restart rows are not
+ * re-read, so {@link #observedPartitionDimValuesBySegment} would
under-include values; to avoid wrongly pruning them,
+ * such segments are published with an empty-filter (non-pruning) {@link
DimensionValueSetShardSpec} instead of one
+ * declaring observed values.
+ */
+ private final Set<SegmentId> restartSpannedSegments =
Sets.newConcurrentHashSet();
+
private volatile DateTime minMessageTime;
private volatile DateTime maxMessageTime;
private final ScheduledExecutorService rejectionPeriodUpdaterExec;
@@ -452,6 +474,9 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
//milliseconds waited for created segments to be handed off
long handoffWaitMs = 0L;
+ final List<String> partitionDimensions =
+
StreamingPartitionsSpec.getPartitionDimensionsOrEmpty(tuningConfig.getStreamingPartitionsSpec());
+
try (final RecordSupplier<PartitionIdType, SequenceOffsetType, RecordType>
recordSupplier =
task.newTaskRecordSupplier(toolbox)) {
this.recordSupplier = recordSupplier;
@@ -495,6 +520,23 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
}
}
);
+
+ // Segments restored from disk span a task restart; their pre-restart
values can't be re-observed, so record them
+ // to fall back to an empty-filter (non-pruning)
DimensionValueSetShardSpec at publish rather than stamping an
+ // incomplete filter.
+ if (!partitionDimensions.isEmpty()) {
+ for (SegmentIdWithShardSpec restored : appenderator.getSegments()) {
+ restartSpannedSegments.add(restored.asSegmentId());
+ }
+ if (!restartSpannedSegments.isEmpty()) {
+ log.warn(
+ "Disabling partition-filter pruning for %d segment(s) restored
across a task restart: %s",
+ restartSpannedSegments.size(),
+ restartSpannedSegments
+ );
+ }
+ }
+
if (restoredMetadata == null) {
// no persist has happened so far
// so either this is a brand new task or replacement of a failed task
@@ -693,6 +735,27 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
);
if (addResult.isOk()) {
+ // Accumulate observed dimension values per segment for
DimensionValueSetShardSpec at publish time.
+ if (!partitionDimensions.isEmpty()) {
+ final SegmentId segmentId =
addResult.getSegmentIdentifier().asSegmentId();
+ final Map<String, Set<String>> segValues =
observedPartitionDimValuesBySegment
+ .computeIfAbsent(segmentId, k -> new
ConcurrentHashMap<>());
+ for (String dim : partitionDimensions) {
+ final Set<String> dimSet = segValues.computeIfAbsent(
+ dim,
+ k -> Collections.synchronizedSet(new HashSet<>())
+ );
+ // Empty getDimension result means a null/missing value;
record null so IS NULL is not pruned
+ // (distinct from "", which getDimension returns as [""
]).
+ final List<String> dimValues = row.getDimension(dim);
+ if (dimValues == null || dimValues.isEmpty()) {
+ dimSet.add(null);
+ } else {
+ dimSet.addAll(dimValues);
+ }
+ }
+ }
+
// If the number of rows in the segment exceeds the
threshold after adding a row,
// move the segment out from the active segments of
BaseAppenderatorDriver to make a new segment.
final boolean isPushRequired = addResult.isPushRequired(
@@ -999,15 +1062,82 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
handOffWaitList.removeAll(handoffFinished);
}
+ @VisibleForTesting
+ void recordObservedDimensionValueForTest(SegmentId segmentId, String
dimension, @Nullable String value)
+ {
+ observedPartitionDimValuesBySegment
+ .computeIfAbsent(segmentId, k -> new ConcurrentHashMap<>())
+ .computeIfAbsent(dimension, k -> Collections.synchronizedSet(new
HashSet<>()))
+ .add(value);
+ }
+
+ @VisibleForTesting
+ void markSegmentRestartSpannedForTest(SegmentId segmentId)
+ {
+ restartSpannedSegments.add(segmentId);
+ }
+
+ /**
+ * Stamps a segment with a {@link DimensionValueSetShardSpec} declaring its
observed dimension values so the broker can
+ * prune it. When the feature is on we always return a {@link
DimensionValueSetShardSpec}, falling back to an empty
+ * (non-pruning) filter map when values can't be safely declared, so
segments in an interval stay class-uniform for
+ * {@link
org.apache.druid.segment.realtime.appenderator.SegmentPublisherHelper}. A null
observed value is carried
+ * through (distinct from {@code ""}) so {@code IS NULL} queries are not
pruned.
+ */
+ @VisibleForTesting
+ DataSegment annotateSegmentWithPartitionDimensionValues(DataSegment s)
+ {
+ final List<String> partitionDimensions =
+
StreamingPartitionsSpec.getPartitionDimensionsOrEmpty(tuningConfig.getStreamingPartitionsSpec());
+ if (CollectionUtils.isNullOrEmpty(partitionDimensions)) {
+ return s;
+ }
+ final Map<String, List<String>> snapshotFilters = new HashMap<>();
+ final SegmentId lookupKey = s.getId();
+ final Map<String, Set<String>> segObserved =
observedPartitionDimValuesBySegment.get(lookupKey);
+ // Leave filters empty for restart-spanned segments: their pre-restart
values can't be re-observed.
+ if (!restartSpannedSegments.contains(lookupKey) && segObserved != null) {
+ for (String dim : partitionDimensions) {
+ final Set<String> vals = segObserved.get(dim);
+ if (vals == null) {
+ continue;
+ }
+ // vals is a synchronized set written by the run loop; copy it under
its monitor to iterate safely.
+ final List<String> snapshot;
+ synchronized (vals) {
+ if (vals.isEmpty()) {
+ continue;
+ }
+ snapshot = new ArrayList<>(vals);
+ }
+ // Sort for deterministic published metadata; null (missing value)
sorts first.
+ snapshot.sort(Comparator.nullsFirst(Comparator.naturalOrder()));
+ snapshotFilters.put(dim, snapshot);
+ }
+ }
+ return s.withShardSpec(
+ new DimensionValueSetShardSpec(
+ s.getShardSpec().getPartitionNum(),
+ s.getShardSpec().getNumCorePartitions(),
+ snapshotFilters
+ )
+ );
+ }
+
private void publishAndRegisterHandoff(SequenceMetadata<PartitionIdType,
SequenceOffsetType> sequenceMetadata)
{
log.debug("Publishing segments for sequence [%s].", sequenceMetadata);
+ // annotateSegmentWithPartitionDimensionValues returns the segment
unchanged when partition filters are not configured,
+ // so it is always safe to apply here.
final ListenableFuture<SegmentsAndCommitMetadata> publishFuture =
Futures.transform(
driver.publish(
sequenceMetadata.createPublisher(this, toolbox,
ioConfig.isUseTransaction()),
sequenceMetadata.getCommitterSupplier(this, stream,
lastPersistedOffsets).get(),
- Collections.singletonList(sequenceMetadata.getSequenceName())
+ Collections.singletonList(sequenceMetadata.getSequenceName()),
+ segments -> segments.stream()
+
.map(this::annotateSegmentWithPartitionDimensionValues)
+
.collect(Collectors.toCollection(LinkedHashSet::new))
),
publishedSegmentsAndMetadata -> {
if (publishedSegmentsAndMetadata == null) {
@@ -1042,6 +1172,12 @@ public abstract class
SeekableStreamIndexTaskRunner<PartitionIdType, SequenceOff
);
log.infoSegments(publishedSegmentsAndCommitMetadata.getSegments(),
"Published segments");
+ for (DataSegment segment :
publishedSegmentsAndCommitMetadata.getSegments()) {
+ final SegmentId segmentId = segment.getId();
+ observedPartitionDimValuesBySegment.remove(segmentId);
+ restartSpannedSegments.remove(segmentId);
+ }
+
publishedSequences.add(sequenceMetadata.getSequenceName());
removeSequence(sequenceMetadata);
publishingSequences.remove(sequenceMetadata.getSequenceName());
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
index 6dda5942119..8de908844c5 100644
---
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskTuningConfig.java
@@ -19,6 +19,7 @@
package org.apache.druid.indexing.seekablestream;
+import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import org.apache.druid.common.config.Configs;
import org.apache.druid.indexer.partitions.DynamicPartitionsSpec;
@@ -70,6 +71,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
private final int numPersistThreads;
private final int maxColumnsToMerge;
private final boolean releaseLocksOnHandoff;
+ @Nullable
+ private final StreamingPartitionsSpec streamingPartitionsSpec;
public SeekableStreamIndexTaskTuningConfig(
@Nullable AppendableIndexSpec appendableIndexSpec,
@@ -97,6 +100,62 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
@Nullable Boolean releaseLocksOnHandoff
)
{
+ this(
+ appendableIndexSpec,
+ maxRowsInMemory,
+ maxBytesInMemory,
+ skipBytesInMemoryOverheadCheck,
+ maxRowsPerSegment,
+ maxTotalRows,
+ intermediatePersistPeriod,
+ basePersistDirectory,
+ maxPendingPersists,
+ indexSpec,
+ indexSpecForIntermediatePersists,
+ reportParseExceptions,
+ handoffConditionTimeout,
+ resetOffsetAutomatically,
+ skipSequenceNumberAvailabilityCheck,
+ segmentWriteOutMediumFactory,
+ intermediateHandoffPeriod,
+ logParseExceptions,
+ maxParseExceptions,
+ maxSavedParseExceptions,
+ numPersistThreads,
+ maxColumnsToMerge,
+ releaseLocksOnHandoff,
+ null
+ );
+ }
+
+ public SeekableStreamIndexTaskTuningConfig(
+ @Nullable AppendableIndexSpec appendableIndexSpec,
+ @Nullable Integer maxRowsInMemory,
+ @Nullable Long maxBytesInMemory,
+ @Nullable Boolean skipBytesInMemoryOverheadCheck,
+ @Nullable Integer maxRowsPerSegment,
+ @Nullable Long maxTotalRows,
+ @Nullable Period intermediatePersistPeriod,
+ @Nullable File basePersistDirectory,
+ @Nullable Integer maxPendingPersists,
+ @Nullable IndexSpec indexSpec,
+ @Nullable IndexSpec indexSpecForIntermediatePersists,
+ @Deprecated @Nullable Boolean reportParseExceptions,
+ @Nullable Long handoffConditionTimeout,
+ @Nullable Boolean resetOffsetAutomatically,
+ Boolean skipSequenceNumberAvailabilityCheck,
+ @Nullable SegmentWriteOutMediumFactory segmentWriteOutMediumFactory,
+ @Nullable Period intermediateHandoffPeriod,
+ @Nullable Boolean logParseExceptions,
+ @Nullable Integer maxParseExceptions,
+ @Nullable Integer maxSavedParseExceptions,
+ @Nullable Integer numPersistThreads,
+ @Nullable Integer maxColumnsToMerge,
+ @Nullable Boolean releaseLocksOnHandoff,
+ @Nullable StreamingPartitionsSpec streamingPartitionsSpec
+ )
+ {
+ this.streamingPartitionsSpec = streamingPartitionsSpec;
this.appendableIndexSpec = appendableIndexSpec == null ?
DEFAULT_APPENDABLE_INDEX : appendableIndexSpec;
this.maxRowsInMemory = maxRowsInMemory == null ?
DEFAULT_MAX_ROWS_IN_MEMORY_REALTIME : maxRowsInMemory;
this.partitionsSpec = new DynamicPartitionsSpec(maxRowsPerSegment,
maxTotalRows);
@@ -198,6 +257,18 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
return partitionsSpec;
}
+ /**
+ * Dimensions whose observed values are recorded per published segment for
query-time pruning via
+ * {@link org.apache.druid.timeline.partition.DimensionValueSetShardSpec}.
Null when not configured.
+ */
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ public StreamingPartitionsSpec getStreamingPartitionsSpec()
+ {
+ return streamingPartitionsSpec;
+ }
+
@JsonProperty
public Period getIntermediatePersistPeriod()
{
@@ -336,7 +407,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
Objects.equals(indexSpec, that.indexSpec) &&
Objects.equals(indexSpecForIntermediatePersists,
that.indexSpecForIntermediatePersists) &&
Objects.equals(segmentWriteOutMediumFactory,
that.segmentWriteOutMediumFactory) &&
- Objects.equals(intermediateHandoffPeriod,
that.intermediateHandoffPeriod);
+ Objects.equals(intermediateHandoffPeriod,
that.intermediateHandoffPeriod) &&
+ Objects.equals(streamingPartitionsSpec,
that.streamingPartitionsSpec);
}
@Override
@@ -364,7 +436,8 @@ public abstract class SeekableStreamIndexTaskTuningConfig
implements TuningConfi
maxSavedParseExceptions,
numPersistThreads,
maxColumnsToMerge,
- releaseLocksOnHandoff
+ releaseLocksOnHandoff,
+ streamingPartitionsSpec
);
}
diff --git
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamingPartitionsSpec.java
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamingPartitionsSpec.java
new file mode 100644
index 00000000000..f489f32aff3
--- /dev/null
+++
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/StreamingPartitionsSpec.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.indexing.seekablestream;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Streaming analog of the batch/compaction {@code partitionsSpec}. Configured
in the streaming tuning config, it
+ * declares the dimensions whose observed values each published segment should
record in a
+ * {@link org.apache.druid.timeline.partition.DimensionValueSetShardSpec} so
the broker can prune segments at query time
+ * without waiting for compaction. Unlike batch partitioning this does not
route rows into shards; it only annotates
+ * segments with the values they happened to ingest.
+ *
+ * <p>Use low-to-medium cardinality dimensions; the {@code
partitionDimensions} here should be kept in sync with the
+ * {@code partitionDimensions} of the compaction config for the same
datasource.
+ */
+public class StreamingPartitionsSpec
+{
+ private final List<String> partitionDimensions;
+
+ @JsonCreator
+ public StreamingPartitionsSpec(
+ @JsonProperty("partitionDimensions") @Nullable List<String>
partitionDimensions
+ )
+ {
+ this.partitionDimensions = partitionDimensions == null ?
Collections.emptyList() : partitionDimensions;
+ }
+
+ @JsonProperty
+ @JsonInclude(JsonInclude.Include.NON_EMPTY)
+ public List<String> getPartitionDimensions()
+ {
+ return partitionDimensions;
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ StreamingPartitionsSpec that = (StreamingPartitionsSpec) o;
+ return Objects.equals(partitionDimensions, that.partitionDimensions);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(partitionDimensions);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StreamingPartitionsSpec{partitionDimensions=" +
partitionDimensions + '}';
+ }
+
+ public static List<String> getPartitionDimensionsOrEmpty(@Nullable
StreamingPartitionsSpec spec)
+ {
+ return spec == null ? List.of() : spec.getPartitionDimensions();
+ }
+}
diff --git
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
index 01c22ed36d0..6d17fdb4276 100644
---
a/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
+++
b/indexing-service/src/test/java/org/apache/druid/indexing/seekablestream/SeekableStreamIndexTaskRunnerTest.java
@@ -60,6 +60,9 @@ import org.apache.druid.server.coordination.ServerType;
import org.apache.druid.server.coordinator.CreateDataSegments;
import org.apache.druid.server.security.AuthTestUtils;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
+import org.apache.druid.timeline.partition.DimensionValueSetShardSpec;
+import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.joda.time.DateTime;
import org.joda.time.Period;
import org.junit.Assert;
@@ -80,6 +83,7 @@ import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -335,7 +339,8 @@ public class SeekableStreamIndexTaskRunnerTest
final StreamAppenderatorDriver driver =
Mockito.mock(StreamAppenderatorDriver.class);
Mockito.when(task.newDriver(any(), any(), any()))
.thenReturn(driver);
- Mockito.when(driver.publish(any(), any(), any()))
+ // publishAndRegisterHandoff calls the 4-arg publish overload (with the
shard-spec annotator function).
+ Mockito.when(driver.publish(any(), any(), any(), any()))
.thenReturn(Futures.immediateFuture(commitMetadata));
Mockito.when(driver.registerHandoff(any()))
.thenReturn(Futures.immediateFuture(commitMetadata));
@@ -352,6 +357,257 @@ public class SeekableStreamIndexTaskRunnerTest
emitter.verifyValue("ingest/rows/published", 10_000L);
}
+ @Test
+ public void
testAnnotateSegmentStampsDimensionValueSetShardSpecForObservedValues() throws
Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ Map.of("partition", "0"),
+ Map.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant")));
+
+ final DataSegment segment = createSingleSegment();
+ final SegmentId lookupKey = segment.getId();
+ // Observe out of order; the published values must come back sorted.
+ observe(runner, lookupKey, "tenant", "tenant_c", "tenant_a", "tenant_b");
+
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ Assert.assertTrue(
+ "A segment created during the current run with observed values should
get a DimensionValueSetShardSpec",
+ annotated.getShardSpec() instanceof DimensionValueSetShardSpec
+ );
+ final DimensionValueSetShardSpec shardSpec = (DimensionValueSetShardSpec)
annotated.getShardSpec();
+ Assert.assertEquals(
+ Arrays.asList("tenant_a", "tenant_b", "tenant_c"),
+ shardSpec.getPartitionDimensionValues().get("tenant")
+ );
+ }
+
+ /**
+ * A segment that spans a task restart has incomplete observed values, so it
must NOT declare any partition filters
+ * (no pruning), to avoid wrongly pruning pre-restart rows. It is still
stamped with an empty-filter
+ * {@link DimensionValueSetShardSpec} (not a bare {@link NumberedShardSpec})
so that all segments in an interval keep a
+ * uniform shard-spec class for {@link
org.apache.druid.segment.realtime.appenderator.SegmentPublisherHelper}, which
+ * rejects a publish batch mixing shard-spec classes within an interval.
+ */
+ @Test
+ public void
testRestartSpannedSegmentGetsEmptyFilterDimensionValueSetShardSpec() throws
Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant")));
+
+ final DataSegment segment = createSingleSegment();
+ final SegmentId lookupKey = segment.getId();
+
+ // Post-restart, only tenant_c is observed; tenant_a/tenant_b live only in
pre-restart hydrants.
+ observe(runner, lookupKey, "tenant", "tenant_c");
+ // The runner marks this segment as restored-from-disk (spans a restart).
+ markRestartSpanned(runner, lookupKey);
+
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ Assert.assertTrue(
+ "A restart-spanned segment must be stamped with a
DimensionValueSetShardSpec (class-uniform with freshly-stamped "
+ + "segments in the same interval) so SegmentPublisherHelper does not
reject the publish",
+ annotated.getShardSpec() instanceof DimensionValueSetShardSpec
+ );
+ Assert.assertTrue(
+ "Its filters must be empty (no pruning) so incompletely-observed
pre-restart rows are never pruned away",
+ ((DimensionValueSetShardSpec)
annotated.getShardSpec()).getPartitionDimensionValues().isEmpty()
+ );
+ }
+
+ /**
+ * A restart batch mixes a restart-spanned partition (empty-filter fallback)
with a freshly-observed one in the same
+ * interval. Both must keep a uniform shard-spec class so the publish isn't
rejected.
+ */
+ @Test
+ public void
testRestartBatchMixingFallbackAndObservedSegmentsPublishesWithDimensionValueSetShardSpec()
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant")));
+
+ // Two partitions in one interval: partition 0 was restored from disk
across a restart, partition 1 created after.
+ final List<DataSegment> sameIntervalPartitions = CreateDataSegments
+ .ofDatasource(DATA_SOURCE)
+ .startingAt("2025-01-01")
+ .forIntervals(1, Granularities.DAY)
+ .withNumPartitions(2)
+ .eachOfSizeInMb(500);
+ final DataSegment restartSpanned = sameIntervalPartitions.get(0);
+ final DataSegment freshlyObserved = sameIntervalPartitions.get(1);
+
+ markRestartSpanned(runner, restartSpanned.getId());
+ observe(runner, restartSpanned.getId(), "tenant", "tenant_c");
+ observe(runner, freshlyObserved.getId(), "tenant", "tenant_a");
+
+ final DataSegment annotatedRestartSpanned =
runner.annotateSegmentWithPartitionDimensionValues(restartSpanned);
+ final DataSegment annotatedFreshlyObserved =
runner.annotateSegmentWithPartitionDimensionValues(freshlyObserved);
+
+ Assert.assertEquals(
+ annotatedRestartSpanned.getShardSpec().getClass(),
+ annotatedFreshlyObserved.getShardSpec().getClass()
+ );
+ Assert.assertTrue(annotatedRestartSpanned.getShardSpec() instanceof
DimensionValueSetShardSpec);
+ Assert.assertTrue(
+ ((DimensionValueSetShardSpec)
annotatedRestartSpanned.getShardSpec()).getPartitionDimensionValues().isEmpty()
+ );
+ Assert.assertEquals(
+ List.of("tenant_a"),
+ ((DimensionValueSetShardSpec)
annotatedFreshlyObserved.getShardSpec()).getPartitionDimensionValues().get("tenant")
+ );
+ }
+
+ /**
+ * A dimension that ingested a null/missing value declares null (as a null
list element) alongside its non-null
+ * values, so {@code IS NULL} queries are not pruned. Here tenant saw
tenant_a and a null; region saw only us-west.
+ */
+ @Test
+ public void testNullValuedDimensionDeclaresNullInPartitionDimensionValues()
throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant",
"region")));
+
+ final DataSegment segment = createSingleSegment();
+ final SegmentId lookupKey = segment.getId();
+
+ // tenant saw a non-null value and (in another row) a null/missing value;
region only saw non-null values.
+ observe(runner, lookupKey, "tenant", "tenant_a", null);
+ observe(runner, lookupKey, "region", "us-west");
+
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ Assert.assertTrue(
+ annotated.getShardSpec() instanceof DimensionValueSetShardSpec
+ );
+ final DimensionValueSetShardSpec shardSpec = (DimensionValueSetShardSpec)
annotated.getShardSpec();
+ // tenant declares both its non-null value AND null, so IS NULL queries
are not pruned.
+ Assert.assertEquals(
+ Arrays.asList(null, "tenant_a"),
+ shardSpec.getPartitionDimensionValues().get("tenant")
+ );
+ Assert.assertEquals(
+ ImmutableSet.of("us-west"),
+
ImmutableSet.copyOf(shardSpec.getPartitionDimensionValues().get("region"))
+ );
+ }
+
+ /**
+ * A dimension that ingested only a null value declares {@code [null]} —
pruned for concrete-value queries but never
+ * for {@code IS NULL}.
+ */
+ @Test
+ public void testOnlyNullValuedDimensionDeclaresNull() throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant")));
+
+ final DataSegment segment = createSingleSegment();
+ final SegmentId lookupKey = segment.getId();
+
+ observe(runner, lookupKey, "tenant", (String) null);
+
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ Assert.assertTrue(annotated.getShardSpec() instanceof
DimensionValueSetShardSpec);
+ final DimensionValueSetShardSpec shardSpec = (DimensionValueSetShardSpec)
annotated.getShardSpec();
+ Assert.assertEquals(
+ Collections.singletonList(null),
+ shardSpec.getPartitionDimensionValues().get("tenant")
+ );
+ }
+
+ /**
+ * Feature on, but a segment ingested no values for any tracked dimension
(nothing recorded under its key). It still
+ * gets an empty-filter {@link DimensionValueSetShardSpec} rather than being
returned as a bare {@link NumberedShardSpec},
+ * so it stays class-uniform with its interval siblings for
+ * {@link
org.apache.druid.segment.realtime.appenderator.SegmentPublisherHelper}.
+ */
+ @Test
+ public void
testSegmentWithNoObservedValuesGetsEmptyFilterDimensionValueSetShardSpec()
throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+ Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec())
+ .thenReturn(new StreamingPartitionsSpec(List.of("tenant")));
+
+ // No observe(...) call: nothing was recorded for this segment.
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(createSingleSegment());
+
+ Assert.assertTrue(annotated.getShardSpec() instanceof
DimensionValueSetShardSpec);
+ Assert.assertTrue(
+ "A segment with no observed values declares no filters (no pruning)
but stays a DimensionValueSetShardSpec",
+ ((DimensionValueSetShardSpec)
annotated.getShardSpec()).getPartitionDimensionValues().isEmpty()
+ );
+ }
+
+ /**
+ * Feature off (no streamingPartitionsSpec): the segment is returned
completely unchanged, retaining its original
+ * shard spec.
+ */
+ @Test
+ public void testFeatureOffReturnsSegmentUnchanged() throws Exception
+ {
+ final TestSeekableStreamIndexTaskRunner runner = createRunner(
+ ImmutableMap.of("partition", "0"),
+ ImmutableMap.of("partition", "100")
+ );
+
Mockito.when(task.getTuningConfig().getStreamingPartitionsSpec()).thenReturn(null);
+
+ final DataSegment segment = createSingleSegment();
+ final DataSegment annotated =
runner.annotateSegmentWithPartitionDimensionValues(segment);
+
+ Assert.assertSame("With the feature off the segment must be returned
unchanged", segment, annotated);
+ }
+
+ private static DataSegment createSingleSegment()
+ {
+ return CreateDataSegments
+ .ofDatasource(DATA_SOURCE)
+ .startingAt("2025-01-01")
+ .forIntervals(1, Granularities.DAY)
+ .withNumPartitions(1)
+ .eachOfSizeInMb(500)
+ .get(0);
+ }
+
+ private static void observe(
+ SeekableStreamIndexTaskRunner runner,
+ SegmentId segmentId,
+ String dimension,
+ String... values
+ )
+ {
+ for (String value : values) {
+ runner.recordObservedDimensionValueForTest(segmentId, dimension, value);
+ }
+ }
+
+ private static void markRestartSpanned(SeekableStreamIndexTaskRunner runner,
SegmentId segmentId)
+ {
+ runner.markSegmentRestartSpannedForTest(segmentId);
+ }
+
private TaskToolbox createTaskToolbox()
{
final TestUtils testUtils = new TestUtils();
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/DimensionValueSetShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionValueSetShardSpec.java
new file mode 100644
index 00000000000..6be48e37026
--- /dev/null
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/DimensionValueSetShardSpec.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+
+import javax.annotation.Nullable;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+/**
+ * A {@link NumberedShardSpec} that additionally declares, per dimension, the
set of values a streaming segment
+ * contains ({@link #partitionDimensionValues}), letting the broker prune
segments whose values cannot match a query filter
+ * before compaction. A dimension absent from {@link
#partitionDimensionValues} is not pruned on.
+ */
+public class DimensionValueSetShardSpec extends NumberedShardSpec
+{
+ /**
+ * Maps dimension name → exhaustive list of values that can appear in this
shard for that dimension.
+ * An absent dimension means "all values possible" (no pruning on that
dimension).
+ */
+ private final Map<String, List<String>> partitionDimensionValues;
+
+ @JsonCreator
+ public DimensionValueSetShardSpec(
+ @JsonProperty("partitionNum") int partitionNum,
+ @JsonProperty("partitions") int partitions,
+ @JsonProperty("partitionDimensionValues") @Nullable Map<String,
List<String>> partitionDimensionValues
+ )
+ {
+ super(partitionNum, partitions);
+ this.partitionDimensionValues = partitionDimensionValues == null ?
Collections.emptyMap() : partitionDimensionValues;
+ }
+
+ @JsonProperty("partitionDimensionValues")
+ public Map<String, List<String>> getPartitionDimensionValues()
+ {
+ return partitionDimensionValues;
+ }
+
+ @Override
+ public List<String> getDomainDimensions()
+ {
+ return ImmutableList.copyOf(partitionDimensionValues.keySet());
+ }
+
+ /**
+ * Returns false only when the query filter explicitly constrains a
dimension that this shard declares,
+ * and none of this shard's allowed values for that dimension fall within
the filter domain.
+ *
+ * <p>A null entry in a dimension's allowed-values list denotes a row whose
value was null/missing. Druid encodes a
+ * null match in the query domain as the range {@code (-inf, "")} (see e.g.
{@code NullFilter}), so a declared null
+ * is tested against the domain as {@link Range#lessThan} {@code ""} rather
than as a point value. Every other value
+ * (including the empty string {@code ""}) is tested as a singleton point,
keeping null and {@code ""} distinct.
+ *
+ * @return true if segment needs to be considered for query, false if it can
be pruned
+ */
+ @Override
+ public boolean possibleInDomain(Map<String, RangeSet<String>> domain)
+ {
+ if (partitionDimensionValues.isEmpty()) {
+ return true;
+ }
+
+ for (Map.Entry<String, List<String>> entry :
partitionDimensionValues.entrySet()) {
+ final String dimension = entry.getKey();
+ final List<String> allowedValues = entry.getValue();
+
+ final RangeSet<String> domainRangeSet = domain.get(dimension);
+ if (domainRangeSet == null || domainRangeSet.isEmpty()) {
+ // Query doesn't constrain this dimension — cannot prune on it.
+ continue;
+ }
+
+ boolean anyMatch = false;
+ for (String value : allowedValues) {
+ // Null is represented in the domain as the range (-inf, ""); any
other value as a singleton point.
+ final Range<String> valueRange = value == null ? Range.lessThan("") :
Range.singleton(value);
+ if (!domainRangeSet.subRangeSet(valueRange).isEmpty()) {
+ anyMatch = true;
+ break;
+ }
+ }
+ if (!anyMatch) {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ @Override
+ public String getType()
+ {
+ return Type.DIM_VALUE_SET;
+ }
+
+ @Override
+ public ShardSpec withPartitionNum(int partitionNum)
+ {
+ return new DimensionValueSetShardSpec(partitionNum,
getNumCorePartitions(), partitionDimensionValues);
+ }
+
+ @Override
+ public ShardSpec withCorePartitions(int partitions)
+ {
+ return new DimensionValueSetShardSpec(getPartitionNum(), partitions,
partitionDimensionValues);
+ }
+
+ @Override
+ public boolean equals(Object o)
+ {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ if (!super.equals(o)) {
+ return false;
+ }
+ DimensionValueSetShardSpec that = (DimensionValueSetShardSpec) o;
+ return Objects.equals(partitionDimensionValues,
that.partitionDimensionValues);
+ }
+
+ @Override
+ public int hashCode()
+ {
+ return Objects.hash(super.hashCode(), partitionDimensionValues);
+ }
+
+ @Override
+ public String toString()
+ {
+ return "DimensionValueSetShardSpec{" +
+ "partitionNum=" + getPartitionNum() +
+ ", partitions=" + getNumCorePartitions() +
+ ", partitionDimensionValues=" + partitionDimensionValues +
+ '}';
+ }
+}
diff --git
a/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
b/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
index ab70cf9f39d..f1098d2282d 100644
---
a/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
+++
b/processing/src/main/java/org/apache/druid/timeline/partition/ShardSpec.java
@@ -43,6 +43,7 @@ import java.util.Map;
@JsonSubTypes.Type(name = ShardSpec.Type.LINEAR, value =
LinearShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.NUMBERED, value =
NumberedShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.HASHED, value =
HashBasedNumberedShardSpec.class),
+ @JsonSubTypes.Type(name = ShardSpec.Type.DIM_VALUE_SET, value =
DimensionValueSetShardSpec.class),
@JsonSubTypes.Type(name = ShardSpec.Type.NUMBERED_OVERWRITE, value =
NumberedOverwriteShardSpec.class),
// BuildingShardSpecs are the shardSpec with missing numCorePartitions,
and thus must not be published.
// See BuildingShardSpec for more details.
@@ -202,6 +203,7 @@ public interface ShardSpec
String LINEAR = "linear";
String NUMBERED = "numbered";
String HASHED = "hashed";
+ String DIM_VALUE_SET = "dim_value_set";
String NUMBERED_OVERWRITE = "numbered_overwrite";
diff --git
a/processing/src/test/java/org/apache/druid/timeline/partition/DimensionValueSetShardSpecTest.java
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionValueSetShardSpecTest.java
new file mode 100644
index 00000000000..4d1e6285854
--- /dev/null
+++
b/processing/src/test/java/org/apache/druid/timeline/partition/DimensionValueSetShardSpecTest.java
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.timeline.partition;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Range;
+import com.google.common.collect.RangeSet;
+import com.google.common.collect.TreeRangeSet;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class DimensionValueSetShardSpecTest
+{
+ private static final String TENANT = "tenant";
+
+ private static DimensionValueSetShardSpec spec(Map<String, List<String>>
filters)
+ {
+ return new DimensionValueSetShardSpec(0, 1, filters);
+ }
+
+ private static RangeSet<String> points(String... values)
+ {
+ final RangeSet<String> rangeSet = TreeRangeSet.create();
+ for (String v : values) {
+ rangeSet.add(Range.singleton(v));
+ }
+ return rangeSet;
+ }
+
+ private static Map<String, RangeSet<String>> domain(String dimension,
String... values)
+ {
+ return ImmutableMap.of(dimension, points(values));
+ }
+
+ private static Map<String, RangeSet<String>> rangeFilter(String dimension,
String lower, String upper)
+ {
+ final RangeSet<String> rangeSet = TreeRangeSet.create();
+ rangeSet.add(Range.closed(lower, upper));
+ return ImmutableMap.of(dimension, rangeSet);
+ }
+
+ /**
+ * The query domain Druid builds for an {@code IS NULL} filter: a null match
is encoded as the range {@code (-inf, "")}
+ * (see e.g. {@code NullFilter#getDimensionRangeSet}).
+ */
+ private static Map<String, RangeSet<String>> nullDomain(String dimension)
+ {
+ final RangeSet<String> rangeSet = TreeRangeSet.create();
+ rangeSet.add(Range.lessThan(""));
+ return ImmutableMap.of(dimension, rangeSet);
+ }
+
+ @Test
+ public void testNoFilters_alwaysTrue()
+ {
+ final DimensionValueSetShardSpec s = spec(Collections.emptyMap());
+ Assert.assertTrue(s.possibleInDomain(domain(TENANT, "tenant_a")));
+ Assert.assertTrue(s.possibleInDomain(Collections.emptyMap()));
+ }
+
+ @Test
+ public void testSingleFilter_matchingValue_returnsTrue()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a")));
+ Assert.assertTrue(s.possibleInDomain(domain(TENANT, "tenant_a")));
+ }
+
+ @Test
+ public void testSingleFilter_nonMatchingValue_returnsFalse()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a")));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "tenant_b")));
+ }
+
+ @Test
+ public void
testSingleFilter_domainHasMultipleValues_matchIncluded_returnsTrue()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a")));
+ Assert.assertTrue(s.possibleInDomain(domain(TENANT, "tenant_a",
"tenant_b")));
+ }
+
+ @Test
+ public void testSingleFilter_domainHasMultipleValues_noMatch_returnsFalse()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a")));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "tenant_b",
"tenant_c")));
+ }
+
+ @Test
+ public void testMultipleAllowedValues_matchOne_returnsTrue()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a", "tenant_b")));
+ Assert.assertTrue(s.possibleInDomain(domain(TENANT, "tenant_b")));
+ }
+
+ @Test
+ public void testMultipleAllowedValues_noMatch_returnsFalse()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a", "tenant_b")));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "tenant_c")));
+ }
+
+ @Test
+ public void testDeclaredDimension_notInQueryDomain_returnsTrue()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a")));
+ Assert.assertTrue(s.possibleInDomain(Collections.emptyMap()));
+ }
+
+ @Test
+ public void testDeclaredDimension_queryFiltersOnOtherDim_returnsTrue()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a")));
+ Assert.assertTrue(s.possibleInDomain(domain("region", "us-west")));
+ }
+
+ @Test
+ public void testRangeFilter_onDeclaredDimension_returnsTrue()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a")));
+ // A range predicate (e.g. TENANT BETWEEN 'a' AND 'z') cannot be pruned
against declared point values.
+ Assert.assertTrue(s.possibleInDomain(rangeFilter(TENANT, "a", "z")));
+ }
+
+ @Test
+ public void testMultipleDimensions_allMatch_returnsTrue()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(
+ TENANT, List.of("tenant_a"),
+ "region", List.of("us-west")
+ ));
+ Assert.assertTrue(s.possibleInDomain(ImmutableMap.of(
+ TENANT, points("tenant_a"),
+ "region", points("us-west")
+ )));
+ }
+
+ @Test
+ public void testMultipleDimensions_oneDimensionMismatches_returnsFalse()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(
+ TENANT, List.of("tenant_a"),
+ "region", List.of("us-west")
+ ));
+ Assert.assertFalse(s.possibleInDomain(ImmutableMap.of(
+ TENANT, points("tenant_a"),
+ "region", points("eu-east")
+ )));
+ }
+
+ @Test
+ public void testMultipleDimensions_onlyOneDimensionInDomain()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(
+ TENANT, List.of("tenant_a"),
+ "region", List.of("us-west")
+ ));
+ Assert.assertTrue(s.possibleInDomain(domain(TENANT, "tenant_a")));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "tenant_b")));
+ }
+
+ @Test
+ public void testGetDomainDimensions_returnsFilterKeys()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(
+ TENANT, List.of("tenant_a"),
+ "region", List.of("us-west")
+ ));
+ Assert.assertTrue(s.getDomainDimensions().contains(TENANT));
+ Assert.assertTrue(s.getDomainDimensions().contains("region"));
+ Assert.assertEquals(2, s.getDomainDimensions().size());
+ }
+
+ @Test
+ public void testGetDomainDimensions_emptyFilters_returnsEmpty()
+ {
+
Assert.assertTrue(spec(Collections.emptyMap()).getDomainDimensions().isEmpty());
+ }
+
+ @Test
+ public void testGetType()
+ {
+ Assert.assertEquals(ShardSpec.Type.DIM_VALUE_SET,
spec(Collections.emptyMap()).getType());
+ }
+
+ private static ObjectMapper newMapper()
+ {
+ return new
ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
false);
+ }
+
+ @Test
+ public void testJsonSerdeRoundTrip() throws Exception
+ {
+ final ObjectMapper mapper = newMapper();
+ final DimensionValueSetShardSpec original = new DimensionValueSetShardSpec(
+ 3,
+ 8,
+ ImmutableMap.of(TENANT, List.of("tenant_a", "tenant_b"), "region",
List.of("us-west"))
+ );
+
+ final DimensionValueSetShardSpec deserialized =
+ mapper.readValue(mapper.writeValueAsString(original),
DimensionValueSetShardSpec.class);
+
+ Assert.assertEquals(original, deserialized);
+ Assert.assertEquals(original.getPartitionDimensionValues(),
deserialized.getPartitionDimensionValues());
+ }
+
+ @Test
+ public void testJsonSerdeContainsType() throws Exception
+ {
+ final ObjectMapper mapper = newMapper();
+ final DimensionValueSetShardSpec spec = new DimensionValueSetShardSpec(0,
1, ImmutableMap.of(TENANT, List.of("tenant_a")));
+ final String json = mapper.writeValueAsString(spec);
+ Assert.assertTrue(json.contains("\"type\":\"dim_value_set\""));
+ Assert.assertTrue(json.contains("\"partitionDimensionValues\""));
+ }
+
+ @Test
+ public void testJsonSerdeWithNullFilters() throws Exception
+ {
+ final ObjectMapper mapper = newMapper();
+ final DimensionValueSetShardSpec original = new
DimensionValueSetShardSpec(0, 1, null);
+
+ final DimensionValueSetShardSpec deserialized =
+ mapper.readValue(mapper.writeValueAsString(original),
DimensionValueSetShardSpec.class);
+
+ Assert.assertEquals(original, deserialized);
+ Assert.assertTrue(deserialized.getPartitionDimensionValues().isEmpty());
+ }
+
+ @Test
+ public void testEmptyStringValue_isDistinctFromNull()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("")));
+ Assert.assertTrue(s.possibleInDomain(domain(TENANT, "")));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "tenant_a")));
+ // An IS NULL query (domain = (-inf, "")) must NOT match a segment that
only declares the empty string.
+ Assert.assertFalse(s.possibleInDomain(nullDomain(TENANT)));
+ }
+
+ @Test
+ public void testNullValue_matchesIsNullQueryOnly()
+ {
+ // A null/missing value is declared as a null list element.
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
Collections.singletonList(null)));
+ Assert.assertTrue(s.possibleInDomain(nullDomain(TENANT)));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "tenant_a")));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "")));
+ }
+
+ @Test
+ public void testConcreteValueOnly_isPrunedForIsNullQuery()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of("tenant_a")));
+ Assert.assertFalse(s.possibleInDomain(nullDomain(TENANT)));
+ }
+
+ @Test
+ public void testNullAndConcreteValues_matchBoth()
+ {
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
Arrays.asList("tenant_a", null)));
+ Assert.assertTrue(s.possibleInDomain(nullDomain(TENANT)));
+ Assert.assertTrue(s.possibleInDomain(domain(TENANT, "tenant_a")));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "tenant_b")));
+ }
+
+ @Test
+ public void testNullValue_jsonSerdeRoundTrip() throws Exception
+ {
+ final ObjectMapper mapper = newMapper();
+ final DimensionValueSetShardSpec original =
+ new DimensionValueSetShardSpec(0, 1, ImmutableMap.of(TENANT,
Arrays.asList("tenant_a", null)));
+
+ final DimensionValueSetShardSpec deserialized =
+ mapper.readValue(mapper.writeValueAsString(original),
DimensionValueSetShardSpec.class);
+
+ Assert.assertEquals(original, deserialized);
+
Assert.assertTrue(deserialized.getPartitionDimensionValues().get(TENANT).contains(null));
+ Assert.assertTrue(deserialized.possibleInDomain(nullDomain(TENANT)));
+ }
+
+ @Test
+ public void testEmptyAllowedList_prunesEverything()
+ {
+ // An empty allowed list means no values were observed for the dimension,
so any constraining query is pruned.
+ final DimensionValueSetShardSpec s = spec(ImmutableMap.of(TENANT,
List.of()));
+ Assert.assertFalse(s.possibleInDomain(domain(TENANT, "tenant_a")));
+ }
+}
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
index 257b1db7278..b45c11bee7e 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/BaseAppenderatorDriver.java
@@ -632,23 +632,28 @@ public abstract class BaseAppenderatorDriver implements
Closeable
segmentsAndCommitMetadata.getSegmentSchemaMapping()
);
if (publishResult.isSuccess()) {
+ // Reconcile to the published shard specs (publishSegments
may annotate them, e.g. DimensionValueSetShardSpec
+ // so logging/handoff reports the real spec, not the
pre-publish one.
+ final SegmentsAndCommitMetadata publishedMetadata =
+
segmentsAndCommitMetadata.withPublishedSegments(publishResult.getSegments());
+
log.info(
"Published [%d] segments with commit metadata[%s].",
- segmentsAndCommitMetadata.getSegments().size(),
+ publishedMetadata.getSegments().size(),
callerMetadata
);
- log.infoSegments(segmentsAndCommitMetadata.getSegments(),
"Published segments");
+ log.infoSegments(publishedMetadata.getSegments(), "Published
segments");
// Log segments upgraded as a result of a concurrent replace
final Set<DataSegment> upgradedSegments = new
HashSet<>(publishResult.getSegments());
-
segmentsAndCommitMetadata.getSegments().forEach(upgradedSegments::remove);
+
publishedMetadata.getSegments().forEach(upgradedSegments::remove);
if (!upgradedSegments.isEmpty()) {
log.info("Published [%d] upgraded segments.",
upgradedSegments.size());
log.infoSegments(upgradedSegments, "Upgraded segments");
}
- log.info("Published segment schemas[%s].",
segmentsAndCommitMetadata.getSegmentSchemaMapping());
- return segmentsAndCommitMetadata
+ log.info("Published segment schemas[%s].",
publishedMetadata.getSegmentSchemaMapping());
+ return publishedMetadata
.withUpgradedSegments(upgradedSegments)
.withWasPublished(true);
} else {
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
index dac9b1b8f34..8418e357e7b 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/SegmentsAndCommitMetadata.java
@@ -24,9 +24,13 @@ import com.google.common.collect.ImmutableSet;
import org.apache.druid.segment.SegmentSchemaMapping;
import org.apache.druid.segment.SegmentUtils;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.SegmentId;
import javax.annotation.Nullable;
+import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
@@ -96,6 +100,30 @@ public class SegmentsAndCommitMetadata
);
}
+ /**
+ * Returns a copy whose segments are replaced (matched by id, preserving
order) with the corresponding entries from
+ * {@code publishedSegments}, so the metadata carries the actually-published
shard specs rather than the pre-publish
+ * ones. Ids not present in {@code publishedSegments} keep their original
entry.
+ */
+ public SegmentsAndCommitMetadata withPublishedSegments(Set<DataSegment>
publishedSegments)
+ {
+ final Map<SegmentId, DataSegment> byId = new HashMap<>();
+ for (DataSegment published : publishedSegments) {
+ byId.put(published.getId(), published);
+ }
+ final List<DataSegment> reconciled = new ArrayList<>(segments.size());
+ for (DataSegment original : segments) {
+ reconciled.add(byId.getOrDefault(original.getId(), original));
+ }
+ return new SegmentsAndCommitMetadata(
+ reconciled,
+ this.commitMetadata,
+ this.segmentSchemaMapping,
+ this.upgradedSegments,
+ this.wasPublished
+ );
+ }
+
public SegmentsAndCommitMetadata withWasPublished(boolean wasPublished)
{
return new SegmentsAndCommitMetadata(
diff --git
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
index 53b0fcbba3b..18b959afb93 100644
---
a/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
+++
b/server/src/main/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriver.java
@@ -279,6 +279,16 @@ public class StreamAppenderatorDriver extends
BaseAppenderatorDriver
final Committer committer,
final Collection<String> sequenceNames
)
+ {
+ return publish(publisher, committer, sequenceNames,
java.util.function.Function.identity());
+ }
+
+ public ListenableFuture<SegmentsAndCommitMetadata> publish(
+ final TransactionalSegmentPublisher publisher,
+ final Committer committer,
+ final Collection<String> sequenceNames,
+ final java.util.function.Function<Set<DataSegment>, Set<DataSegment>>
segmentAnnotateFunction
+ )
{
final List<SegmentIdWithShardSpec> theSegments =
getSegmentIdsWithShardSpecs(sequenceNames);
@@ -291,7 +301,7 @@ public class StreamAppenderatorDriver extends
BaseAppenderatorDriver
null,
sam,
publisher,
- java.util.function.Function.identity()
+ segmentAnnotateFunction
),
MoreExecutors.directExecutor()
);
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
index ef4e54ff351..cef0dd424b1 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/SegmentPublisherHelperTest.java
@@ -21,6 +21,7 @@ package org.apache.druid.segment.realtime.appenderator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.druid.data.input.StringTuple;
import org.apache.druid.java.util.common.Intervals;
@@ -30,6 +31,7 @@ import
org.apache.druid.timeline.partition.BuildingHashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.BuildingNumberedShardSpec;
import org.apache.druid.timeline.partition.BuildingSingleDimensionShardSpec;
import org.apache.druid.timeline.partition.DimensionRangeShardSpec;
+import org.apache.druid.timeline.partition.DimensionValueSetShardSpec;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashBucketShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
@@ -44,6 +46,7 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.Arrays;
+import java.util.Collections;
import java.util.Set;
public class SegmentPublisherHelperTest
@@ -209,6 +212,22 @@ public class SegmentPublisherHelperTest
Assert.assertEquals(segments, annotated);
}
+ @Test
+ public void
testAnnotateShardSpecAllowsMixedFilterDimensionValueSetShardSpecsInSameInterval()
+ {
+ // Empty-filter and populated DimensionValueSetShardSpecs in one interval
must not trip the mismatched shardSpecs
+ // check, so a restart batch publishes without blocking.
+ final Set<DataSegment> segments = ImmutableSet.of(
+ newSegment(new DimensionValueSetShardSpec(0, 0,
Collections.emptyMap())),
+ newSegment(new DimensionValueSetShardSpec(1, 0,
ImmutableMap.of("tenant", ImmutableList.of("tenant_a"))))
+ );
+ final Set<DataSegment> annotated =
SegmentPublisherHelper.annotateShardSpec(segments);
+ Assert.assertEquals(segments, annotated);
+ for (DataSegment segment : annotated) {
+ Assert.assertSame(DimensionValueSetShardSpec.class,
segment.getShardSpec().getClass());
+ }
+ }
+
@Test
public void
testAnnotateShardSpecThrowingExceptionForBucketNumberedShardSpec()
{
diff --git
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
index 0eb46886e58..d5ba958a226 100644
---
a/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
+++
b/server/src/test/java/org/apache/druid/segment/realtime/appenderator/StreamAppenderatorDriverTest.java
@@ -43,6 +43,7 @@ import
org.apache.druid.segment.handoff.SegmentHandoffNotifierFactory;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.realtime.SegmentGenerationMetrics;
import org.apache.druid.timeline.DataSegment;
+import org.apache.druid.timeline.partition.DimensionValueSetShardSpec;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.easymock.EasyMock;
import org.easymock.EasyMockSupport;
@@ -69,6 +70,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
+import java.util.stream.Collectors;
public class StreamAppenderatorDriverTest extends EasyMockSupport
{
@@ -177,6 +179,46 @@ public class StreamAppenderatorDriverTest extends
EasyMockSupport
Assert.assertEquals(3, segmentsAndCommitMetadata.getCommitMetadata());
}
+ @Test(timeout = 60_000L)
+ public void testPublishReturnsAnnotatedShardSpecs() throws Exception
+ {
+ final TestCommitterSupplier<Integer> committerSupplier = new
TestCommitterSupplier<>();
+ Assert.assertNull(driver.startJob(null));
+ for (int i = 0; i < ROWS.size(); i++) {
+ committerSupplier.setMetadata(i + 1);
+ Assert.assertTrue(driver.add(ROWS.get(i), "dummy", committerSupplier,
false, true).isOk());
+ }
+
+ // Annotate function that swaps every segment to a
DimensionValueSetShardSpec. The publisher echoes the annotated segments
+ // into the result, mimicking the metadata store recording the annotated
spec.
+ final Function<Set<DataSegment>, Set<DataSegment>> toDimensionValueSet =
+ segments -> segments.stream()
+ .map(s -> s.withShardSpec(new DimensionValueSetShardSpec(
+ s.getShardSpec().getPartitionNum(),
+ s.getShardSpec().getNumCorePartitions(),
+ ImmutableMap.of("dim1", ImmutableList.of("x"))
+ )))
+ .collect(Collectors.toSet());
+ final TransactionalSegmentPublisher echoPublisher =
+ makePublisher(segmentsToPublish -> SegmentPublishResult.ok(new
HashSet<>(segmentsToPublish)));
+
+ final SegmentsAndCommitMetadata published = driver.publish(
+ echoPublisher,
+ committerSupplier.get(),
+ ImmutableList.of("dummy"),
+ toDimensionValueSet
+ ).get(PUBLISH_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS);
+
+ Assert.assertFalse("Expected at least one published segment",
published.getSegments().isEmpty());
+ for (DataSegment segment : published.getSegments()) {
+ Assert.assertTrue(
+ "Returned segment should carry the published
DimensionValueSetShardSpec, not the pre-publish NumberedShardSpec; "
+ + "got " + segment.getShardSpec().getClass().getSimpleName(),
+ segment.getShardSpec() instanceof DimensionValueSetShardSpec
+ );
+ }
+ }
+
@Test
public void testMaxRowsPerSegment() throws Exception
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]