Vishesh-Paliwal opened a new issue, #19573:
URL: https://github.com/apache/druid/issues/19573
### Description
Today, when ingestion uses `hashed` (or `range`) partitioning with a
`targetRowsPerSegment` / `maxRowsPerSegment`, that value is treated as a
*target assuming evenly distributed partition keys*. If the data is skewed —
i.e. a single partition-key value (e.g. one `orgId`) has far more rows than the
target — Druid silently produces an oversized segment. There is no warning,
metric, or task-report signal indicating that the configured target was
significantly exceeded. The ingestion task simply reports success.
This is because the number of shards is derived from the **cardinality** of
the partition dimension, not from the row distribution *within* each key:
```java
// ParallelIndexSupervisorTask.computeIntervalToNumShards()
estimatedNumShards = Math.round(estimatedCardinality / maxRowsPerSegment);
```
and every row with the same key deterministically hashes into the same
bucket:
```java
// HashPartitionFunction.MURMUR3_32_ABS
return Math.abs(Hashing.murmur3_32().hashBytes(serializedRow).asInt() %
numBuckets);
```
So a single hot key cannot be split, and the resulting segment can be many
multiples of `maxRowsPerSegment` — with nothing surfaced to the operator.
**Proposed change:** after segment generation, compare each segment's actual
row count against the configured `targetRowsPerSegment` / `maxRowsPerSegment`.
When a segment exceeds the target by more than a configurable ratio (default
e.g. `2.0`), emit a `WARN` log **and** surface it in the task report so it is
visible from the Overlord UI rather than buried in worker logs.
Concretely:
- Per-segment row counts are already available at push time via
`Appenderator.getRowCount(SegmentIdWithShardSpec)`.
- The natural hook is in the segment-generate tasks'
`createGeneratedPartitionsReport()` (`PartialHashSegmentGenerateTask` /
`PartialRangeSegmentGenerateTask`).
- Add a field to `IngestionStatsAndErrors` (e.g. `segmentsExceedingTarget`:
list of `{segmentId, rowCount, target}`) so the skew is reported as structured
task output, not just a log line.
- Make the warning ratio configurable with a sensible default; warning at
exactly `1.0×` would be too noisy given the "best-effort target" semantics.
Scope this to the native batch path first; the MSQ segment-generation path
(`FrameChannelHashPartitioner` etc.) can be a follow-up.
This is intentionally a small, observability-only change — it does not alter
partitioning behavior. It only makes existing skew **visible** so operators can
react (re-partition, add a partition dimension, etc.) instead of discovering
oversized segments via slow queries.
### Motivation
**Use case .** We run Druid with daily segment granularity and `hashed`
partitioning by `orgId`, with `maxRowsPerSegment` = 5,000,000. This works well
for the vast majority of tenants. However, we now have one very large tenant
generating roughly **5 billion logs per week**. Because all rows for a single
`orgId` hash to the same shard, that one org's data lands in a small number of
massively oversized segments — far beyond the 5M target — while the configured
limit suggests segments should be ~5M rows. The ingestion tasks report success
with no indication that the target was blown past by orders of magnitude.
The skew is also extremely **spiky**: within the same day this org can
produce ~20 million rows in one hour and ~300 million in another. So the
oversized-segment problem is not even uniform across time chunks — some
segments are catastrophically large and others are normal, which makes it hard
to notice without per-segment inspection.
We strongly suspect these oversized segments are a significant contributor
to **slower queries** for that tenant (and to uneven load across historicals),
but today there is nothing in Druid's output that points an operator at the
root cause. The first symptom is "queries are slow," and it takes manual
segment-size investigation to discover the skew.
**Rationale / benefits:**
- **Operability:** turns a silent, hard-to-diagnose condition into an
explicit, actionable signal at ingestion time.
- **Low risk:** purely additive observability; no change to partitioning
logic, segment layout, or query behavior.
- **Faster root-causing:** operators learn about skew when the data is
ingested, not weeks later via query latency regressions.
- **Foundation for a real fix:** the warning also gives us the diagnostic to
motivate and measure a follow-up that actually mitigates the skew.
**An open question for the maintainers:** given a distribution this skewed
and spiky, the standard advice is to add a secondary partition dimension to
break up the hot key. But that only helps if the secondary dimension actually
varies *within* the hot org, and it doesn't fully address the per-hour
spikiness. Is there a recommended approach today for splitting a single
oversized partition-key value across multiple segments while preserving
cross-key pruning — beyond adding a secondary dimension? I'd also like to
follow up with a separate **Proposal** for automatic overflow sub-partitioning
of skewed range keys (the `// Future improvement: Handle skewed partitions
better` TODO in `PartitionBoundaries` already points at this gap i feel). This
issue is intentionally scoped to just the warning; would love to propose a
deeper fix separately If you feel it's worth it .
I'm happy to contribute the implementation for this warning .
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]