Dandandan commented on code in PR #20820:
URL: https://github.com/apache/datafusion/pull/20820#discussion_r2969281546
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -108,48 +132,136 @@ pub(super) struct ParquetOpener {
pub enable_row_group_stats_pruning: bool,
/// Coerce INT96 timestamps to specific TimeUnit
pub coerce_int96: Option<TimeUnit>,
- /// Optional parquet FileDecryptionProperties
- #[cfg(feature = "parquet_encryption")]
- pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
/// Rewrite expressions in the context of the file schema
pub(crate) expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
- /// Optional factory to create file decryption properties dynamically
- #[cfg(feature = "parquet_encryption")]
- pub encryption_factory:
- Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
+ /// Encryption configuration used to resolve per-file decryption
properties.
+ pub(crate) encryption_context: EncryptionContext,
/// Maximum size of the predicate cache, in bytes. If none, uses
/// the arrow-rs default.
pub max_predicate_cache_size: Option<usize>,
/// Whether to read row groups in reverse order
pub reverse_row_groups: bool,
}
-impl FileOpener for ParquetOpener {
- fn open(&self, partitioned_file: PartitionedFile) ->
Result<FileOpenFuture> {
+impl ParquetMorselizer {
+ pub(crate) fn new(state: ParquetMorselizerState) -> Self {
+ Self {
+ state: Arc::new(state),
+ }
+ }
+}
+
+impl Deref for ParquetMorselizer {
+ type Target = ParquetMorselizerState;
+
+ fn deref(&self) -> &Self::Target {
+ &self.state
+ }
+}
+
+impl Debug for ParquetMorselizer {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ParquetMorselizer")
+ .field("partition_index", &self.partition_index)
+ .field("batch_size", &self.batch_size)
+ .field("limit", &self.limit)
+ .field("preserve_order", &self.preserve_order)
+ .field("metadata_size_hint", &self.metadata_size_hint)
+ .field("pushdown_filters", &self.pushdown_filters)
+ .field("reorder_filters", &self.reorder_filters)
+ .field("force_filter_selections", &self.force_filter_selections)
+ .field("enable_page_index", &self.enable_page_index)
+ .field("enable_bloom_filter", &self.enable_bloom_filter)
+ .field(
+ "enable_row_group_stats_pruning",
+ &self.enable_row_group_stats_pruning,
+ )
+ .field("coerce_int96", &self.coerce_int96)
+ .field("max_predicate_cache_size", &self.max_predicate_cache_size)
+ .field("reverse_row_groups", &self.reverse_row_groups)
+ .finish()
+ }
+}
+
+/// Result of preparing a PartitionedFile using CPU before any I/O.
+///
+/// This captures the state computed from `PartitionedFile`, the table schema,
+/// and scan configuration so that later planner states only need to perform
+/// async work such as metadata loading and stream construction.
+struct PreparedParquetOpen {
+ state: Arc<ParquetMorselizerState>,
+ partitioned_file: PartitionedFile,
+ file_range: Option<datafusion_datasource::FileRange>,
Review Comment:
For the morsel-work to work ok I think we need to:
* Plan entire files only (no `FileRange` / sharing files) as it causes
duplicated data reads / double work
* Ideally: start with a global queue of files as well (not spreaded across
partitions, no one "owns" any file - as this helps to _globally_ order file
read order / morsels read order. In my view that would be beneficial for early
tightening of dynamic filters and optimizations like ordering for aggregations
@adriangb mentioned.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]