schenksj opened a new issue, #3957:
URL: https://github.com/apache/datafusion-comet/issues/3957
## Context
Comet's native Delta scan (PR #3932) translates Catalyst predicates to
`delta-kernel-rs` predicates for file-level data skipping, then lets
DataFusion's `ParquetSource` handle row-group / page-index / row-level
filtering within each file.
Comet's predicate translator (`native/core/src/delta/predicate.rs` →
`catalyst_to_kernel_predicate_with_names`) currently handles the core
comparisons (`=`, `!=`, `<`, `<=`, `>`, `>=`, `AND`, `OR`, `NOT`, `IS NULL`,
`IS NOT NULL`, `IN`, `NOT IN`, `Cast`-unwrap). For any predicate shape it
doesn't recognise, it falls through to `Predicate::unknown(...)`, which
disables stats-based skipping for that leg (conservative, never incorrect).
Delta's Scala `DataSkippingReader` supports several predicate shapes that
Comet does not. On workloads that rely on those shapes, Comet reads more files
than Delta's Scala path does for the same query, even though both paths end up
with the same Parquet-level filtering once the files are opened.
This is an umbrella tracking the four gaps I've identified while doing a
side-by-side analysis of both paths. Each subtask is independently mergeable.
## Gaps
### 1. `StartsWith` file-level skipping
**What Delta Scala does** (prefix-bounds comparison on truncated string
stats):
`spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala:557`
```scala
case StartsWith(SkippingEligibleColumn(a, _), v @ Literal(s: UTF8String, dt:
StringType)) =>
statsProvider.getPredicateWithStatTypes(a, dt, MIN, MAX) { (min, max) =>
val sLen = s.numChars()
substring(min, 0, sLen) <= v && substring(max, 0, sLen) >= v
}
```
A file can be skipped only if no row in it could begin with the literal
prefix.
**What Comet should do:** In `native/core/src/delta/predicate.rs`, add a
case for `StartsWith` that emits the equivalent kernel predicate (kernel
supports substring / string comparison). Preserves file skipping for
prefix-filtered string-key workloads (log tables keyed on `run_id`,
`customer_prefix`, etc.).
### 2. `EqualNullSafe` (`<=>`) file-level skipping
**What Delta Scala does** (rewrite to handle null literal separately):
`spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala:508`
```scala
case EqualNullSafe(a, v: Literal) =>
val rewrittenExpr = if (v.value != null) And(IsNotNull(a), EqualTo(a, v))
else IsNull(a)
constructDataFilters(rewrittenExpr)
case EqualNullSafe(v: Literal, a) =>
constructDataFilters(EqualNullSafe(a, v))
case Not(EqualNullSafe(a, v: Literal)) =>
val rewrittenExpr = if (v.value != null) And(IsNotNull(a), EqualTo(a, v))
else IsNull(a)
constructDataFilters(Not(rewrittenExpr))
```
Non-null literal → `IsNotNull AND EqualTo` (reuses min/max equality path).
Null literal → `IsNull` (uses `nullCount` stats).
**What Comet should do:** In `predicate.rs`, recognise `EqualNullSafe`
(Catalyst's `<=>`) and apply the same rewrite before translation. Cheap, local
change.
### 3. Materialised `InSubqueryExec` file-level skipping
**What Delta Scala does** (treat the resolved subquery as a literal IN-list):
`spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala:571`
```scala
case in: InSubqueryExec =>
// At this point the subquery has been materialized, but values() can
return None if
// the subquery was bypassed at runtime.
in.values().flatMap(v => constructLiteralInListDataFilters(in.child,
v.toSeq))
```
Once the subquery has run, its values are just a materialised `IN` list, so
all the standard min/max-range logic applies.
**What Comet should do:** Currently Comet strips subquery expressions before
handing predicates to the kernel (see `CometDeltaNativeScan.scala`
predicate-build path). This is correct for DPP *partition* pruning (which
happens later at `doExecuteColumnar`), but it means **file-level** stats
skipping misses out. Two possibilities:
- (a) At planning time, if `InSubqueryExec.values()` is already `Some(...)`,
unwrap it into a literal `IN` proto before kernel predicate build.
- (b) Let it fall through as `unknown` (current behaviour) -- option (a) is
a net gain.
### 4. Generated column partition filter derivation
**What Delta Scala does** (derive partition filters from data filters for
generated partition columns):
`spark/src/main/scala/org/apache/spark/sql/delta/GeneratedColumn.scala:485`
and `:497`
```scala
def partitionFilterOptimizationEnabled(spark: SparkSession): Boolean =
spark.sessionState.conf
.getConf(DeltaSQLConf.GENERATED_COLUMN_PARTITION_FILTER_OPTIMIZATION_ENABLED)
def generatePartitionFilters(...): Seq[Expression] = {
...
val partitionFilters = dataFilters.flatMap { filter =>
preprocess(filter) match {
case LessThan(ExtractBaseColumn(nameParts, _), lit: Literal) =>
toPartitionFilter(nameParts, _.lessThan(lit))
case LessThanOrEqual(...) => toPartitionFilter(...,
_.lessThanOrEqual(lit))
case EqualTo(...) => toPartitionFilter(..., _.equalTo(lit))
case GreaterThan(...) => toPartitionFilter(..., _.greaterThan(lit))
case GreaterThanOrEqual(...) => toPartitionFilter(...,
_.greaterThanOrEqual(lit))
case IsNull(...) => toPartitionFilter(..., _.isNull())
case _ => Nil
}
}
```
Each registered `OptimizablePartitionExpression` knows how to turn a
data-column predicate into a partition-column predicate (e.g. `event_time >
'X'` → `partition_date > date(X)`).
**What Comet should do:** Port the registry for the common generated-column
forms (`year(col)`, `date(col)`, `hour(col)`, truncation expressions) and
derive additional partition filters before handing them to
`CometDeltaNativeScan.prunePartitions`. Probably lives in a new helper called
from `CometScanRule` right before `applyRowTrackingRewrite`.
This is the heaviest of the four -- Delta's registry has a lot of special
cases. Start with `date(timestamp)` and `year(timestamp)` since those cover the
majority of real-world generated-partition-column tables.
## Priority / rough effort
| # | Gap | Effort | Impact |
|---|---|---|---|
| 1 | `StartsWith` | Small (pattern-match + kernel predicate) | Medium --
prefix-filtered string keys |
| 2 | `EqualNullSafe` | Small (local rewrite) | Low -- uncommon predicate |
| 3 | Materialised `InSubquery` | Small-Medium (plumbing pre-resolved values
into predicate proto) | Medium -- joins feeding partition-level subqueries |
| 4 | Generated column derivation | Medium-Large (expression registry port)
| High for tables using generated partition columns |
## Acceptance criteria
For each subtask: a correctness test in
`spark/src/test/scala/org/apache/comet/` that verifies Comet reads the same set
of files as Delta's Scala path for a representative query, and any regression
tests in the Delta suite pass.
## Non-goals
- Row-level filtering *within* a row group: already covered by DataFusion's
`RowFilter` / `ArrowPredicate` and actually stronger than Delta's
Spark-vectorised reader, which only does row-group-level pushdown. Not a gap.
- DV + Parquet pushdown interaction: Comet is already stronger here (Delta
disables Parquet pushdown when `useMetadataRowIndex=false`; Comet does not).
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]