schenksj opened a new issue, #3957:
URL: https://github.com/apache/datafusion-comet/issues/3957

   ## Context
   
   Comet's native Delta scan (PR #3932) translates Catalyst predicates to 
`delta-kernel-rs` predicates for file-level data skipping, then lets 
DataFusion's `ParquetSource` handle row-group / page-index / row-level 
filtering within each file.
   
   Comet's predicate translator (`native/core/src/delta/predicate.rs` → 
`catalyst_to_kernel_predicate_with_names`) currently handles the core 
comparisons (`=`, `!=`, `<`, `<=`, `>`, `>=`, `AND`, `OR`, `NOT`, `IS NULL`, 
`IS NOT NULL`, `IN`, `NOT IN`, `Cast`-unwrap). For any predicate shape it 
doesn't recognise, it falls through to `Predicate::unknown(...)`, which 
disables stats-based skipping for that leg (conservative, never incorrect).
   
   Delta's Scala `DataSkippingReader` supports several predicate shapes that 
Comet does not. On workloads that rely on those shapes, Comet reads more files 
than Delta's Scala path does for the same query, even though both paths end up 
with the same Parquet-level filtering once the files are opened.
   
   This is an umbrella tracking the four gaps I've identified while doing a 
side-by-side analysis of both paths. Each subtask is independently mergeable.
   
   ## Gaps
   
   ### 1. `StartsWith` file-level skipping
   
   **What Delta Scala does** (prefix-bounds comparison on truncated string 
stats):
   
   
`spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala:557`
   ```scala
   case StartsWith(SkippingEligibleColumn(a, _), v @ Literal(s: UTF8String, dt: 
StringType)) =>
     statsProvider.getPredicateWithStatTypes(a, dt, MIN, MAX) { (min, max) =>
       val sLen = s.numChars()
       substring(min, 0, sLen) <= v && substring(max, 0, sLen) >= v
     }
   ```
   
   A file can be skipped only if no row in it could begin with the literal 
prefix.
   
   **What Comet should do:** In `native/core/src/delta/predicate.rs`, add a 
case for `StartsWith` that emits the equivalent kernel predicate (kernel 
supports substring / string comparison). Preserves file skipping for 
prefix-filtered string-key workloads (log tables keyed on `run_id`, 
`customer_prefix`, etc.).
   
   ### 2. `EqualNullSafe` (`<=>`) file-level skipping
   
   **What Delta Scala does** (rewrite to handle null literal separately):
   
   
`spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala:508`
   ```scala
   case EqualNullSafe(a, v: Literal) =>
     val rewrittenExpr = if (v.value != null) And(IsNotNull(a), EqualTo(a, v)) 
else IsNull(a)
     constructDataFilters(rewrittenExpr)
   case EqualNullSafe(v: Literal, a) =>
     constructDataFilters(EqualNullSafe(a, v))
   case Not(EqualNullSafe(a, v: Literal)) =>
     val rewrittenExpr = if (v.value != null) And(IsNotNull(a), EqualTo(a, v)) 
else IsNull(a)
     constructDataFilters(Not(rewrittenExpr))
   ```
   
   Non-null literal → `IsNotNull AND EqualTo` (reuses min/max equality path).  
Null literal → `IsNull` (uses `nullCount` stats).
   
   **What Comet should do:** In `predicate.rs`, recognise `EqualNullSafe` 
(Catalyst's `<=>`) and apply the same rewrite before translation. Cheap, local 
change.
   
   ### 3. Materialised `InSubqueryExec` file-level skipping
   
   **What Delta Scala does** (treat the resolved subquery as a literal IN-list):
   
   
`spark/src/main/scala/org/apache/spark/sql/delta/stats/DataSkippingReader.scala:571`
   ```scala
   case in: InSubqueryExec =>
     // At this point the subquery has been materialized, but values() can 
return None if
     // the subquery was bypassed at runtime.
     in.values().flatMap(v => constructLiteralInListDataFilters(in.child, 
v.toSeq))
   ```
   
   Once the subquery has run, its values are just a materialised `IN` list, so 
all the standard min/max-range logic applies.
   
   **What Comet should do:** Currently Comet strips subquery expressions before 
handing predicates to the kernel (see `CometDeltaNativeScan.scala` 
predicate-build path). This is correct for DPP *partition* pruning (which 
happens later at `doExecuteColumnar`), but it means **file-level** stats 
skipping misses out. Two possibilities:
   - (a) At planning time, if `InSubqueryExec.values()` is already `Some(...)`, 
unwrap it into a literal `IN` proto before kernel predicate build.
   - (b) Let it fall through as `unknown` (current behaviour) -- option (a) is 
a net gain.
   
   ### 4. Generated column partition filter derivation
   
   **What Delta Scala does** (derive partition filters from data filters for 
generated partition columns):
   
   `spark/src/main/scala/org/apache/spark/sql/delta/GeneratedColumn.scala:485` 
and `:497`
   ```scala
   def partitionFilterOptimizationEnabled(spark: SparkSession): Boolean =
     spark.sessionState.conf
       
.getConf(DeltaSQLConf.GENERATED_COLUMN_PARTITION_FILTER_OPTIMIZATION_ENABLED)
   
   def generatePartitionFilters(...): Seq[Expression] = {
     ...
     val partitionFilters = dataFilters.flatMap { filter =>
       preprocess(filter) match {
         case LessThan(ExtractBaseColumn(nameParts, _), lit: Literal) =>
           toPartitionFilter(nameParts, _.lessThan(lit))
         case LessThanOrEqual(...) => toPartitionFilter(..., 
_.lessThanOrEqual(lit))
         case EqualTo(...) => toPartitionFilter(..., _.equalTo(lit))
         case GreaterThan(...) => toPartitionFilter(..., _.greaterThan(lit))
         case GreaterThanOrEqual(...) => toPartitionFilter(..., 
_.greaterThanOrEqual(lit))
         case IsNull(...) => toPartitionFilter(..., _.isNull())
         case _ => Nil
       }
     }
   ```
   
   Each registered `OptimizablePartitionExpression` knows how to turn a 
data-column predicate into a partition-column predicate (e.g. `event_time > 
'X'` → `partition_date > date(X)`).
   
   **What Comet should do:** Port the registry for the common generated-column 
forms (`year(col)`, `date(col)`, `hour(col)`, truncation expressions) and 
derive additional partition filters before handing them to 
`CometDeltaNativeScan.prunePartitions`. Probably lives in a new helper called 
from `CometScanRule` right before `applyRowTrackingRewrite`.
   
   This is the heaviest of the four -- Delta's registry has a lot of special 
cases. Start with `date(timestamp)` and `year(timestamp)` since those cover the 
majority of real-world generated-partition-column tables.
   
   ## Priority / rough effort
   
   | # | Gap | Effort | Impact |
   |---|---|---|---|
   | 1 | `StartsWith` | Small (pattern-match + kernel predicate) | Medium -- 
prefix-filtered string keys |
   | 2 | `EqualNullSafe` | Small (local rewrite) | Low -- uncommon predicate |
   | 3 | Materialised `InSubquery` | Small-Medium (plumbing pre-resolved values 
into predicate proto) | Medium -- joins feeding partition-level subqueries |
   | 4 | Generated column derivation | Medium-Large (expression registry port) 
| High for tables using generated partition columns |
   
   ## Acceptance criteria
   
   For each subtask: a correctness test in 
`spark/src/test/scala/org/apache/comet/` that verifies Comet reads the same set 
of files as Delta's Scala path for a representative query, and any regression 
tests in the Delta suite pass.
   
   ## Non-goals
   
   - Row-level filtering *within* a row group: already covered by DataFusion's 
`RowFilter` / `ArrowPredicate` and actually stronger than Delta's 
Spark-vectorised reader, which only does row-group-level pushdown. Not a gap.
   - DV + Parquet pushdown interaction: Comet is already stronger here (Delta 
disables Parquet pushdown when `useMetadataRowIndex=false`; Comet does not).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to