asolimando commented on code in PR #19957:
URL: https://github.com/apache/datafusion/pull/19957#discussion_r2911488857
##########
datafusion/common/src/stats.rs:
##########
@@ -660,7 +637,14 @@ impl Statistics {
col_stats.max_value =
col_stats.max_value.max(&item_col_stats.max_value);
col_stats.min_value =
col_stats.min_value.min(&item_col_stats.min_value);
col_stats.sum_value =
col_stats.sum_value.add(&item_col_stats.sum_value);
- col_stats.distinct_count = Precision::Absent;
+ // Use max as a conservative lower bound for distinct count
+ // (can't accurately merge NDV since duplicates may exist across
partitions)
Review Comment:
>I'm concerned that the conservative lower-bound estimation could cause huge
inaccuracy.
>
> For example, we have 100 parts, and each part has 100 rows, and they are
linearly increasing. The reality is that the ndv should be 100 * 100, but now
we evaluate it as 100. The result could cause inaccuracies to propagate
throughout subsequent cost estimation algorithms.
You raise a good point, the true NDV across partitions lies in
`[max(partition NDVs), sum(partition NDVs)]`: `max` when values fully overlap
(e.g., a `status` column), `sum` when completely disjoint (e.g., `order_id`),
no single strategy is optimal.
This is how different major OSS systems deal with the problem:
- Trino: `max` across partitions
([source](https://github.com/trinodb/trino/blob/9c1abc44e162ae8c63fcc07d072c56a16def1ed6/plugin/trino-hive/src/main/java/io/trino/plugin/hive/statistics/AbstractHiveStatisticsProvider.java#L678-L688)).
Known to be problematic with disjoint domains:
[trinodb/trino#50](https://github.com/trinodb/trino/issues/50) shows 62K
estimated vs 150M actual. This is what I considered in the current proposal.
- Spark: `ANALYZE TABLE` runs a full-table `HyperLogLogPlusPlus` aggregation
([source](https://github.com/apache/spark/blob/5932f11fb91606606234f486f289be1358b1d829/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala#L392)),
so there are no per-partition NDVs to merge. But this strategy doesn't seem to
be fitting here.
- Hive: stores HLL bitvector sketches per partition in the metastore, merges
them at planning time ([HLL
merge](https://github.com/apache/hive/blob/925df927248c1be11b84df3e8a0c3e43955e133e/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/common/ndv/hll/HyperLogLog.java#L443-L485),
[aggregator
merge](https://github.com/apache/hive/blob/925df927248c1be11b84df3e8a0c3e43955e133e/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java#L88-L100)),
and
[interpolates](https://github.com/apache/hive/blob/925df927248c1be11b84df3e8a0c3e43955e133e/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/columnstats/aggr/StringColumnStatsAggregator.java#L200-L201)
for missing partitions by merging adjacent bitvectors and extrapolating. Note
that this is exactly what
[trinodb/trino#50](https://github.com/trinodb/trino/issues/50) suggests, but in
this first PR I a
imed for simplicity.
- DuckDB: HLL merge with Good-Turing sampling correction
([merge](https://github.com/duckdb/duckdb/blob/19a12e9b74386a9e212811ef0c04014b04baede5/src/storage/statistics/distinct_statistics.cpp#L19-L23),
[Good-Turing](https://github.com/duckdb/duckdb/blob/19a12e9b74386a9e212811ef0c04014b04baede5/src/storage/statistics/distinct_statistics.cpp#L53-L68)),
but only for its native format. No HLL data available when reading Parquet.
Good-Turing won't help here but it could be used for
https://github.com/apache/datafusion/pull/19957#discussion_r2897575666, to
extrapolate NDVs from a sample of the whole population, even if sampling should
be random (cc: @jonathanc-n, as a complement to the threshold strategy?).
> HyperLogLog is a classic way to process this, but for a large dataset, it
takes a long time to process.
The core constraint is that Parquet stores NDV as a single `optional i64`
([spec](https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L301)),
not a sketch, and most writers don't even set it (see
[apache/arrow-rs#8608](https://github.com/apache/arrow-rs/issues/8608)). So
when merging row groups, we're interpolating from scalar values.
Regarding HLL cost: sketches are compact (~1.6KB for 2% error), merge is
`O(registers)`, so it's cheap at planning time. The cost is at ingestion. Data
sketches are standard practice at scale: Hive uses HLL for NDV and KLL for
histograms/quantiles (e.g.,
[HIVE-26243](https://issues.apache.org/jira/browse/HIVE-26243),
[HIVE-26221](https://issues.apache.org/jira/browse/HIVE-26221)). In extreme
cases sketch size might be a concern, but it's not the general case in my
experience.
> IIRC, the merge method may be called during query execution, so using
HyperLogLog here isn't ideal.
To clarify: `try_merge` is only called at planning time (by
`partition_statistics(None)` in optimizer rules like `JoinSelection`, and
during file group metadata collection), not during query execution. This is
orthogonal to statistics-based pruning work like
[#19609](https://github.com/apache/datafusion/pull/19609).
I mentioned `JoinSelection` as a concrete use of NDV, but NDV can be used to
drive other planning decisions
(https://github.com/apache/datafusion/issues/20766 covers many of them). Some
examples:
- Aggregation pushdown: NDV estimates the grouping reduction ratio:
overestimating NDV makes the optimizer skip beneficial pushdowns,
underestimating makes it push down aggregation expecting good compression that
doesn't materialize.
- Hash table sizing via NDV: underestimate causes resizing/spilling,
overestimate wastes memory.
- Filters (NDV for IN/equality): underestimating makes a filter push-down
look more appealing than it is, overestimating might make us not push the
filter down.
Bottom line is: there's no universally "safe" direction, the impact depends
on how the statistic is consumed/used, since the consumers of NDV aren't fully
defined yet, there is no strong guidance available.
Longer term, if we invest in CBO and statistics (as
[#8227](https://github.com/apache/datafusion/issues/8227) and
[#20766](https://github.com/apache/datafusion/issues/20766) suggest), the "one
size fits all" problem will be exacerbated for downstream projects. Systems
like Hive, Spark, or DuckDB, being end-user databases/warehouses, can make hard
choices (e.g., Hive mandating HLL in the metastore). DataFusion is a framework,
so we can't take one single route with no way to override. Decision points need
to be tunable: what and how you read stats, how to propagate them through
operators and expressions, and how to use them for planning decisions, for both
built-in and custom stats, expressions, and operators.
I'm working on this via a `StatisticsProvider`/`StatisticsRegistry` pattern
([WIP
branch](https://github.com/asolimando/datafusion/tree/asolimando/statistics-planner-prototype)),
including `ExtendedStatistics` for custom data (histograms, sketches, etc.).
For reading custom stats from Parquet, [user-defined Parquet
indexes](https://datafusion.apache.org/blog/2025/07/14/user-defined-parquet-indexes/)
could provide the ingestion mechanism: embed HLL sketches alongside Parquet
data and read them at planning time through the provider chain.
Until we have that, maybe we should gate this novel propagation mechanism
with a configuration property, so we are free to experiment without causing
unexpected changes in existing planning? For NDV I haven't done it because, as
discussed, it's not usually set by standard writers, so I don't expect any
impact unless you make a conscious effort to write them and use them, but if we
feel unsure I can add that safeguard.
Apologies for the wall of text! This discussion deserves its own space, but
I wanted to address your concern here, happy to take the discussion to an RFC
or issue, as you see fit.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]