Dandandan commented on code in PR #20820:
URL: https://github.com/apache/datafusion/pull/20820#discussion_r2982524025
##########
datafusion/datasource-parquet/src/opener.rs:
##########
@@ -108,48 +132,136 @@ pub(super) struct ParquetOpener {
pub enable_row_group_stats_pruning: bool,
/// Coerce INT96 timestamps to specific TimeUnit
pub coerce_int96: Option<TimeUnit>,
- /// Optional parquet FileDecryptionProperties
- #[cfg(feature = "parquet_encryption")]
- pub file_decryption_properties: Option<Arc<FileDecryptionProperties>>,
/// Rewrite expressions in the context of the file schema
pub(crate) expr_adapter_factory: Arc<dyn PhysicalExprAdapterFactory>,
- /// Optional factory to create file decryption properties dynamically
- #[cfg(feature = "parquet_encryption")]
- pub encryption_factory:
- Option<(Arc<dyn EncryptionFactory>, EncryptionFactoryOptions)>,
+ /// Encryption configuration used to resolve per-file decryption
properties.
+ pub(crate) encryption_context: EncryptionContext,
/// Maximum size of the predicate cache, in bytes. If none, uses
/// the arrow-rs default.
pub max_predicate_cache_size: Option<usize>,
/// Whether to read row groups in reverse order
pub reverse_row_groups: bool,
}
-impl FileOpener for ParquetOpener {
- fn open(&self, partitioned_file: PartitionedFile) ->
Result<FileOpenFuture> {
+impl ParquetMorselizer {
+ pub(crate) fn new(state: ParquetMorselizerState) -> Self {
+ Self {
+ state: Arc::new(state),
+ }
+ }
+}
+
+impl Deref for ParquetMorselizer {
+ type Target = ParquetMorselizerState;
+
+ fn deref(&self) -> &Self::Target {
+ &self.state
+ }
+}
+
+impl Debug for ParquetMorselizer {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("ParquetMorselizer")
+ .field("partition_index", &self.partition_index)
+ .field("batch_size", &self.batch_size)
+ .field("limit", &self.limit)
+ .field("preserve_order", &self.preserve_order)
+ .field("metadata_size_hint", &self.metadata_size_hint)
+ .field("pushdown_filters", &self.pushdown_filters)
+ .field("reorder_filters", &self.reorder_filters)
+ .field("force_filter_selections", &self.force_filter_selections)
+ .field("enable_page_index", &self.enable_page_index)
+ .field("enable_bloom_filter", &self.enable_bloom_filter)
+ .field(
+ "enable_row_group_stats_pruning",
+ &self.enable_row_group_stats_pruning,
+ )
+ .field("coerce_int96", &self.coerce_int96)
+ .field("max_predicate_cache_size", &self.max_predicate_cache_size)
+ .field("reverse_row_groups", &self.reverse_row_groups)
+ .finish()
+ }
+}
+
+/// Result of preparing a PartitionedFile using CPU before any I/O.
+///
+/// This captures the state computed from `PartitionedFile`, the table schema,
+/// and scan configuration so that later planner states only need to perform
+/// async work such as metadata loading and stream construction.
+struct PreparedParquetOpen {
+ state: Arc<ParquetMorselizerState>,
+ partitioned_file: PartitionedFile,
+ file_range: Option<datafusion_datasource::FileRange>,
Review Comment:
Ahhh - all the benchmarks read all the parquet footers for all files upfront
so for the benchmarks we don't have any misses even for single file queries.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]