abhishekrb19 opened a new pull request, #19571:
URL: https://github.com/apache/druid/pull/19571
Streaming-published segments currently use NumberedShardSpec, whose
`possibleInDomain()` always returns true, so the broker can't prune them by
query filter until compaction — even if the topic is partitioned, which is easy
to do with multiple
supervisors. For multi-tenant datasources this means every tenant-filtered
query hits every recent segment.
This PR lets streaming tasks record, per published segment, the distinct
values observed for a configured set of dimensions, and declare them on a new
shard spec so the broker can prune near-realtime data without waiting for
compaction. Concurrent compaction sometimes cannot keep up with incoming data
and pruning benefits with `range` shard specs may not be fully realized for
upto 30 minutes or more, and doesn't help with high concurrent query workloads
that are only querying more recent data.
This functionality is opt-in, Kafka only, off by default.
**Design**
- StreamRangeShardSpec (type: "stream_range") — extends NumberedShardSpec
(behaves as a normal append segment) plus a partitionFilters map (dimension →
observed values). possibleInDomain() prunes a segment when the query constrains
a declared
dimension and none of its values intersect the domain; a dimension not in
partitionFilters is never pruned on. Set-based (not min/max), so it prunes
precisely for sparse values and tolerates overlapping value sets across
tasks/restarts.
- Because partition values are observed at ingestion rather than hardcoded,
incorrect or abruptly-changing partitioning never breaks correctness - at worst
it yields non-prunable or large shard specs.
- Ingestion (SeekableStreamIndexTaskRunner) — when
partitionFilterDimensions is set, the task accumulates observed values per
segment and stamps each at publish.
**Configuration**
New optional IO config field partitionFilterDimensions on the Kafka
supervisor/task (default null). When unset, behavior is unchanged. Documented
in docs/ingestion/kafka-ingestion.md.
**Compatibility**
Backward-compatible and opt-in. But stream_range is a new core ShardSpec
type with no defaultImpl fallback, so it is not forward-compatible: upgrade all
services before enabling partitionFilterDimensions, and note that once
stream_range
segments are published, downgrade isn't supported until they're compacted
away.
**Results**
Tested in a cluster, where I saw up to ~40% reduction in segment scans on
the historicals for a few low to medium cardinal partition dimensions. In a
follow-up, I want to extend this to also prune tasks, for reduced peon buffers
and better query performance at the task layer.
**Caveats**
There's currently no limit on the number of observed values stamped into a
segment's partitionFilters. It may make sense, in a follow-up, to add a
configurable guardrail that falls back to NumberedShardSpec when the count
exceeds a threshold,
so shard specs don't get bloated.
This PR has:
- [x] been self-reviewed.
- [x] added documentation for new or modified features or behaviors.
- [ ] a release note entry in the PR description.
- [x] added Javadocs for most classes and all non-trivial methods. Linked
related entities via Javadoc links.
- [x] added comments explaining the "why" and the intent of the code
wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths,
ensuring the threshold for [code
coverage](https://github.com/apache/druid/blob/master/dev/code-review/code-coverage.md)
is met.
- [x] added integration tests.
- [x] been tested in a test Druid cluster.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]