Dandandan commented on code in PR #20820:
URL: https://github.com/apache/datafusion/pull/20820#discussion_r2969281546


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -108,48 +132,136 @@ pub(super) struct ParquetOpener {
     pub enable_row_group_stats_pruning: bool,
     /// Coerce INT96 timestamps to specific TimeUnit
     pub coerce_int96: Option<TimeUnit>,
-    /// Optional parquet FileDecryptionProperties
-    #[cfg(feature = "parquet_encryption")]
-    pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
     /// Rewrite expressions in the context of the file schema
     pub(crate) expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
-    /// Optional factory to create file decryption properties dynamically
-    #[cfg(feature = "parquet_encryption")]
-    pub encryption_factory:
-        Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
+    /// Encryption configuration used to resolve per-file decryption 
properties.
+    pub(crate) encryption_context: EncryptionContext,
     /// Maximum size of the predicate cache, in bytes. If none, uses
     /// the arrow-rs default.
     pub max_predicate_cache_size: Option<usize>,
     /// Whether to read row groups in reverse order
     pub reverse_row_groups: bool,
 }
 
-impl FileOpener for ParquetOpener {
-    fn open(&self, partitioned_file: PartitionedFile) -> 
Result<FileOpenFuture> {
+impl ParquetMorselizer {
+    pub(crate) fn new(state: ParquetMorselizerState) -> Self {
+        Self {
+            state: Arc::new(state),
+        }
+    }
+}
+
+impl Deref for ParquetMorselizer {
+    type Target = ParquetMorselizerState;
+
+    fn deref(&self) -> &Self::Target {
+        &self.state
+    }
+}
+
+impl Debug for ParquetMorselizer {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ParquetMorselizer")
+            .field("partition_index", &self.partition_index)
+            .field("batch_size", &self.batch_size)
+            .field("limit", &self.limit)
+            .field("preserve_order", &self.preserve_order)
+            .field("metadata_size_hint", &self.metadata_size_hint)
+            .field("pushdown_filters", &self.pushdown_filters)
+            .field("reorder_filters", &self.reorder_filters)
+            .field("force_filter_selections", &self.force_filter_selections)
+            .field("enable_page_index", &self.enable_page_index)
+            .field("enable_bloom_filter", &self.enable_bloom_filter)
+            .field(
+                "enable_row_group_stats_pruning",
+                &self.enable_row_group_stats_pruning,
+            )
+            .field("coerce_int96", &self.coerce_int96)
+            .field("max_predicate_cache_size", &self.max_predicate_cache_size)
+            .field("reverse_row_groups", &self.reverse_row_groups)
+            .finish()
+    }
+}
+
+/// Result of preparing a PartitionedFile using CPU before any I/O.
+///
+/// This captures the state computed from `PartitionedFile`, the table schema,
+/// and scan configuration so that later planner states only need to perform
+/// async work such as metadata loading and stream construction.
+struct PreparedParquetOpen {
+    state: Arc<ParquetMorselizerState>,
+    partitioned_file: PartitionedFile,
+    file_range: Option<datafusion_datasource::FileRange>,

Review Comment:
   For the morsel-work to work ok I think we need to:
   
   * Plan entire files only (no `FileRange` / sharing files) as it causes 
duplicated data reads / double work
   * Ideally: start with a global queue of files as well (not spreaded across 
partitions, no one "owns" any file - as this helps to _globally_ order file 
read order / morsels read order. In my view that would be beneficial for early 
tightening of dynamic filters and optimizations like ordering for aggregations 
@adriangb  mentioned.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to