mbutrovich commented on code in PR #3536:
URL: https://github.com/apache/datafusion-comet/pull/3536#discussion_r2854464577
##########
native/core/src/execution/operators/iceberg_scan.rs:
##########
@@ -251,38 +258,34 @@ where
Poll::Ready(Some(Ok(batch))) => {
let file_schema = batch.schema();
- // Check if we need to create a new adapter for this file's
schema
- let needs_new_adapter = match &self.cached_adapter {
- Some((cached_schema, _)) => !Arc::ptr_eq(cached_schema,
&file_schema),
- None => true,
+ // Reuse cached projection expressions if file schema hasn't
changed,
+ // otherwise create a new adapter and build new expressions
+ let projection_exprs = match &self.cached {
+ Some(cached) if cached.file_schema.as_ref() ==
file_schema.as_ref() => {
Review Comment:
This is using a deep `Schema ==` instead of `Arc::ptr_eq`. Since
iceberg-rust reuses the same `Arc<Schema>` for all batches from a file,
`ptr_eq` was faster.
##########
native/core/src/execution/operators/iceberg_scan.rs:
##########
@@ -313,3 +316,48 @@ impl DisplayAs for IcebergScanExec {
)
}
}
+
+/// Build projection expressions that adapt batches from a file schema to the
target schema.
+///
+/// The returned expressions can be cached and reused across multiple batches
+/// that share the same file schema, avoiding repeated expression construction.
+fn build_projection_expressions(
+ target_schema: &SchemaRef,
+ adapter: &Arc<dyn PhysicalExprAdapter>,
+) -> DFResult<Vec<Arc<dyn PhysicalExpr>>> {
+ target_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, _field)| {
+ let col_expr: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema(
+ target_schema.field(i).name(),
+ target_schema.as_ref(),
+ )?);
+ adapter.rewrite(col_expr)
+ })
+ .collect::<DFResult<Vec<_>>>()
+}
+
+/// Adapt a batch to match the target schema using pre-built projection
expressions.
+///
+/// The caller provides pre-built `projection_exprs` (from
[`build_projection_expressions`])
+/// which can be cached and reused across multiple batches with the same file
schema.
+fn adapt_batch_with_expressions(
+ batch: RecordBatch,
+ target_schema: &SchemaRef,
+ projection_exprs: &[Arc<dyn PhysicalExpr>],
+) -> DFResult<RecordBatch> {
+ // If schemas match, no adaptation needed
+ if batch.schema().as_ref() == target_schema.as_ref() {
Review Comment:
This is using a deep `Schema ==` instead of `Arc::ptr_eq`. Since
iceberg-rust reuses the same `Arc<Schema>` for all batches from a file,
`ptr_eq` was faster.
Also, this check might be redundant if I'm understanding control flow. If
the caller has cached projection expressions, it should apply them
unconditionally. If we do need the check, use `Arc::ptr_eq` instead.
##########
native/core/src/execution/operators/iceberg_scan.rs:
##########
@@ -313,3 +316,48 @@ impl DisplayAs for IcebergScanExec {
)
}
}
+
+/// Build projection expressions that adapt batches from a file schema to the
target schema.
+///
+/// The returned expressions can be cached and reused across multiple batches
+/// that share the same file schema, avoiding repeated expression construction.
+fn build_projection_expressions(
+ target_schema: &SchemaRef,
+ adapter: &Arc<dyn PhysicalExprAdapter>,
+) -> DFResult<Vec<Arc<dyn PhysicalExpr>>> {
+ target_schema
+ .fields()
+ .iter()
+ .enumerate()
+ .map(|(i, _field)| {
+ let col_expr: Arc<dyn PhysicalExpr> =
Arc::new(Column::new_with_schema(
+ target_schema.field(i).name(),
+ target_schema.as_ref(),
+ )?);
+ adapter.rewrite(col_expr)
+ })
+ .collect::<DFResult<Vec<_>>>()
+}
+
+/// Adapt a batch to match the target schema using pre-built projection
expressions.
+///
+/// The caller provides pre-built `projection_exprs` (from
[`build_projection_expressions`])
+/// which can be cached and reused across multiple batches with the same file
schema.
+fn adapt_batch_with_expressions(
+ batch: RecordBatch,
+ target_schema: &SchemaRef,
+ projection_exprs: &[Arc<dyn PhysicalExpr>],
+) -> DFResult<RecordBatch> {
+ // If schemas match, no adaptation needed
+ if batch.schema().as_ref() == target_schema.as_ref() {
+ return Ok(batch);
+ }
+
+ // Evaluate expressions against batch
+ let columns: Vec<ArrayRef> = projection_exprs
Review Comment:
This might be a future optimization if I'm understanding control flow
correctly, but for default value/schema change columns we're allocating new
`ArrayRef` every time. It seems like there are caching opportunities here.
Maybe file an issue to optimize default value/identity columns/schema change
columns in the expression adapter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]