alamb commented on code in PR #20820:
URL: https://github.com/apache/datafusion/pull/20820#discussion_r2969546070


##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -108,48 +132,136 @@ pub(super) struct ParquetOpener {
     pub enable_row_group_stats_pruning: bool,
     /// Coerce INT96 timestamps to specific TimeUnit
     pub coerce_int96: Option<TimeUnit>,
-    /// Optional parquet FileDecryptionProperties
-    #[cfg(feature = "parquet_encryption")]
-    pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
     /// Rewrite expressions in the context of the file schema
     pub(crate) expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
-    /// Optional factory to create file decryption properties dynamically
-    #[cfg(feature = "parquet_encryption")]
-    pub encryption_factory:
-        Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
+    /// Encryption configuration used to resolve per-file decryption 
properties.
+    pub(crate) encryption_context: EncryptionContext,
     /// Maximum size of the predicate cache, in bytes. If none, uses
     /// the arrow-rs default.
     pub max_predicate_cache_size: Option<usize>,
     /// Whether to read row groups in reverse order
     pub reverse_row_groups: bool,
 }
 
-impl FileOpener for ParquetOpener {
-    fn open(&self, partitioned_file: PartitionedFile) -> 
Result<FileOpenFuture> {
+impl ParquetMorselizer {
+    pub(crate) fn new(state: ParquetMorselizerState) -> Self {
+        Self {
+            state: Arc::new(state),
+        }
+    }
+}
+
+impl Deref for ParquetMorselizer {
+    type Target = ParquetMorselizerState;
+
+    fn deref(&self) -> &Self::Target {
+        &self.state
+    }
+}
+
+impl Debug for ParquetMorselizer {
+    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+        f.debug_struct("ParquetMorselizer")
+            .field("partition_index", &self.partition_index)
+            .field("batch_size", &self.batch_size)
+            .field("limit", &self.limit)
+            .field("preserve_order", &self.preserve_order)
+            .field("metadata_size_hint", &self.metadata_size_hint)
+            .field("pushdown_filters", &self.pushdown_filters)
+            .field("reorder_filters", &self.reorder_filters)
+            .field("force_filter_selections", &self.force_filter_selections)
+            .field("enable_page_index", &self.enable_page_index)
+            .field("enable_bloom_filter", &self.enable_bloom_filter)
+            .field(
+                "enable_row_group_stats_pruning",
+                &self.enable_row_group_stats_pruning,
+            )
+            .field("coerce_int96", &self.coerce_int96)
+            .field("max_predicate_cache_size", &self.max_predicate_cache_size)
+            .field("reverse_row_groups", &self.reverse_row_groups)
+            .finish()
+    }
+}
+
+/// Result of preparing a PartitionedFile using CPU before any I/O.
+///
+/// This captures the state computed from `PartitionedFile`, the table schema,
+/// and scan configuration so that later planner states only need to perform
+/// async work such as metadata loading and stream construction.
+struct PreparedParquetOpen {
+    state: Arc<ParquetMorselizerState>,
+    partitioned_file: PartitionedFile,
+    file_range: Option<datafusion_datasource::FileRange>,

Review Comment:
   This FileRange comes from the initial plan (e.g. it is basically the result 
of the "static" scheduler that assigned portions of files to different 
partition. 
   
   > Plan entire files only (no FileRange / sharing files) as it causes 
duplicated data reads / double work
   
   I think the file range is just the range used to decide what row groups to 
read (not just the IO)
   
   The idea of a global queue (when ordering isn't needed) is probably a good 
one though



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to