Dandandan commented on code in PR #19639:
URL: https://github.com/apache/datafusion/pull/19639#discussion_r2809847688
##########
datafusion/datasource-parquet/src/row_filter.rs:
##########
@@ -654,6 +711,211 @@ pub fn build_row_filter(
.map(|filters| Some(RowFilter::new(filters)))
}
+/// Build a [`RowFilter`] from the given predicate expression, returning
per-expression metrics.
+///
+/// This is similar to [`build_row_filter`] but additionally returns
[`FilterMetrics`] for each
+/// filter expression. The metrics can be read after stream processing
completes to update
+/// selectivity statistics.
+///
+/// # Arguments
+/// * `expr` - The filter predicate, already adapted to reference columns in
`file_schema`
+/// * `file_schema` - The Arrow schema of the parquet file
+/// * `metadata` - Parquet file metadata used for cost estimation
+/// * `reorder_predicates` - If true, reorder predicates to minimize I/O
+/// * `file_metrics` - Metrics for tracking filter performance
+/// * `selectivity_tracker` - Tracker containing effectiveness data for filter
reordering
+///
+/// # Returns
+/// * `Ok(Some(result))` containing the row filter and per-expression metrics
+/// * `Ok(None)` if no expressions can be used as a RowFilter
+/// * `Err(e)` if an error occurs while building the filter
+pub fn build_row_filter_with_metrics(
+ predicates: Vec<Arc<dyn PhysicalExpr>>,
+ file_schema: &SchemaRef,
+ metadata: &ParquetMetaData,
+ reorder_predicates: bool,
+ file_metrics: &ParquetFileMetrics,
+ selectivity_tracker: &SelectivityTracker,
+) -> Result<Option<RowFilterWithMetrics>> {
+ let rows_pruned = &file_metrics.pushdown_rows_pruned;
+ let rows_matched = &file_metrics.pushdown_rows_matched;
+ let time = &file_metrics.row_pushdown_eval_time;
+
+ // Determine which conjuncts can be evaluated as ArrowPredicates, if any
+ // We need to preserve the original expressions before building candidates
+ let mut candidates_with_exprs: Vec<(Arc<dyn PhysicalExpr>,
FilterCandidate)> =
+ predicates
+ .into_iter()
+ .filter_map(|expr| {
+ let original_expr = Arc::clone(&expr);
+ FilterCandidateBuilder::new(expr, Arc::clone(file_schema))
+ .build(metadata)
+ .ok()
+ .flatten()
+ .map(|candidate| (original_expr, candidate))
+ })
+ .collect();
+
+ // no candidates
+ if candidates_with_exprs.is_empty() {
+ return Ok(None);
+ }
+
+ if reorder_predicates {
+ candidates_with_exprs.sort_unstable_by(|(_, c1), (_, c2)| {
Review Comment:
AFAIK `build_row_filter_with_metrics` only runs _once_ on file `open`?
Which means for all files/partitions that are opened directly on start it
will not do anything.
(E.g. for TPCH / TPCDS this is not helping much as the number of files is
limited, so it will only help if partitions are started one after another).
`clickbench_partitioned` consists of 100 files - so it might help there more.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]