alamb commented on code in PR #11444:
URL: https://github.com/apache/datafusion/pull/11444#discussion_r1676430454


##########
datafusion/common/src/config.rs:
##########
@@ -454,6 +470,80 @@ config_namespace! {
     }
 }
 
+#[cfg(feature = "parquet")]
+impl ParquetOptions {
+    /// Convert the global session options, [`ParquetOptions`], into a single 
write action's [`WriterPropertiesBuilder`].
+    ///
+    /// The returned [`WriterPropertiesBuilder`] can then be further modified 
with additional options
+    /// applied per column; a customization which is not applicable for 
[`ParquetOptions`].
+    pub fn writer_props_from_global_opts(&self) -> 
Result<WriterPropertiesBuilder> {
+        let ParquetOptions {
+            data_pagesize_limit,
+            write_batch_size,
+            writer_version,
+            compression,
+            dictionary_enabled,
+            dictionary_page_size_limit,
+            statistics_enabled,
+            max_statistics_size,
+            max_row_group_size,
+            created_by,
+            column_index_truncate_length,
+            data_page_row_count_limit,
+            encoding,
+            bloom_filter_on_write,
+            bloom_filter_fpp,
+            bloom_filter_ndv,
+
+            // not in WriterProperties

Review Comment:
   this is a nice list showing the difference in what is supported at what level



##########
datafusion/common/src/config.rs:
##########
@@ -1421,6 +1511,84 @@ impl TableParquetOptions {
     }
 }
 
+#[cfg(feature = "parquet")]
+impl TryFrom<&TableParquetOptions> for WriterPropertiesBuilder {

Review Comment:
   This file is already pretty large -- could you move the code into 
`datafusion/common/src/file_options/parquet_writer.rs` instead? I think since 
`TableParquetOptions` has all pub fields, this would be feasible
   
   Also, doing that would leave the configuration settings independent of 
parquet (long term we are hoping to make it easier to have parquet 
enabled/disabled)



##########
datafusion/common/src/config.rs:
##########
@@ -454,6 +470,80 @@ config_namespace! {
     }
 }
 
+#[cfg(feature = "parquet")]
+impl ParquetOptions {
+    /// Convert the global session options, [`ParquetOptions`], into a single 
write action's [`WriterPropertiesBuilder`].
+    ///
+    /// The returned [`WriterPropertiesBuilder`] can then be further modified 
with additional options
+    /// applied per column; a customization which is not applicable for 
[`ParquetOptions`].
+    pub fn writer_props_from_global_opts(&self) -> 
Result<WriterPropertiesBuilder> {

Review Comment:
   I think this is basically the API described in 
https://github.com/apache/datafusion/issues/11367 (cc @Rachelint )



##########
datafusion/common/src/config.rs:
##########
@@ -1874,4 +2042,193 @@ mod tests {
         let parsed_metadata = table_config.parquet.key_value_metadata;
         assert_eq!(parsed_metadata.get("key_dupe"), Some(&Some("B".into())));
     }
+
+    #[cfg(feature = "parquet")]
+    mod test_conversion_from_session_to_writer_props {
+        use super::super::*;
+        use parquet::{basic::Compression, file::properties::EnabledStatistics};
+
+        const COL_NAME: &str = "configured";
+
+        /// Take the column defaults provided in [`ParquetOptions`], and 
generate a non-default col config.
+        fn column_options_with_non_defaults(
+            src_col_defaults: &ParquetOptions,
+        ) -> ColumnOptions {
+            ColumnOptions {
+                compression: Some("zstd(22)".into()),
+                dictionary_enabled: 
src_col_defaults.dictionary_enabled.map(|v| !v),
+                statistics_enabled: Some("page".into()),
+                max_statistics_size: Some(72),
+                encoding: Some("RLE".into()),
+                bloom_filter_enabled: Some(true),
+                bloom_filter_fpp: Some(0.72),
+                bloom_filter_ndv: Some(72),
+            }
+        }
+
+        fn parquet_options_with_non_defaults() -> ParquetOptions {
+            let defaults = ParquetOptions::default();
+            let writer_version = if defaults.writer_version.eq("1.0") {
+                "2.0"
+            } else {
+                "1.0"
+            };
+
+            ParquetOptions {
+                data_pagesize_limit: 42,
+                write_batch_size: 42,
+                writer_version: writer_version.into(),
+                compression: Some("zstd(22)".into()),
+                dictionary_enabled: 
Some(!defaults.dictionary_enabled.unwrap_or(false)),
+                dictionary_page_size_limit: 42,
+                statistics_enabled: Some("chunk".into()),
+                max_statistics_size: Some(42),
+                max_row_group_size: 42,
+                created_by: "wordy".into(),
+                column_index_truncate_length: Some(42),
+                data_page_row_count_limit: 42,
+                encoding: Some("BYTE_STREAM_SPLIT".into()),
+                bloom_filter_on_write: !defaults.bloom_filter_on_write,
+                bloom_filter_fpp: Some(0.42),
+                bloom_filter_ndv: Some(42),
+
+                // not in WriterProperties, but itemizing here to not skip 
newly added props
+                enable_page_index: Default::default(),
+                pruning: Default::default(),
+                skip_metadata: Default::default(),
+                metadata_size_hint: Default::default(),
+                pushdown_filters: Default::default(),
+                reorder_filters: Default::default(),
+                allow_single_file_parallelism: Default::default(),
+                maximum_parallel_row_group_writers: Default::default(),
+                maximum_buffered_record_batches_per_stream: Default::default(),
+                bloom_filter_on_read: Default::default(),
+            }
+        }
+
+        fn extract_column_options(
+            props: &WriterProperties,
+            col: ColumnPath,
+        ) -> ColumnOptions {
+            let bloom_filter_default_props = 
props.bloom_filter_properties(&col);
+
+            ColumnOptions {
+                bloom_filter_enabled: 
Some(bloom_filter_default_props.is_some()),
+                encoding: props.encoding(&col).map(|s| s.to_string()),
+                dictionary_enabled: Some(props.dictionary_enabled(&col)),
+                compression: match props.compression(&col) {
+                    Compression::ZSTD(lvl) => {
+                        Some(format!("zstd({})", lvl.compression_level()))
+                    }
+                    _ => None,
+                },
+                statistics_enabled: Some(
+                    match props.statistics_enabled(&col) {
+                        EnabledStatistics::None => "none",
+                        EnabledStatistics::Chunk => "chunk",
+                        EnabledStatistics::Page => "page",
+                    }
+                    .into(),
+                ),
+                bloom_filter_fpp: bloom_filter_default_props.map(|p| p.fpp),
+                bloom_filter_ndv: bloom_filter_default_props.map(|p| p.ndv),
+                max_statistics_size: Some(props.max_statistics_size(&col)),
+            }
+        }
+
+        /// For testing only, take a single write's props and convert back 
into the session config.
+        /// (use identity to confirm correct.)
+        fn session_config_from_writer_props(
+            props: &WriterProperties,
+        ) -> TableParquetOptions {
+            let default_col = ColumnPath::from("col doesn't have specific 
config");
+            let default_col_props = extract_column_options(props, default_col);
+
+            let configured_col = ColumnPath::from(COL_NAME);
+            let configured_col_props = extract_column_options(props, 
configured_col);
+
+            let key_value_metadata = props
+                .key_value_metadata()
+                .map(|pairs| {
+                    HashMap::from_iter(
+                        pairs
+                            .iter()
+                            .cloned()
+                            .map(|KeyValue { key, value }| (key, value)),
+                    )
+                })
+                .unwrap_or_default();
+
+            TableParquetOptions {
+                global: ParquetOptions {
+                    // global options
+                    data_pagesize_limit: props.dictionary_page_size_limit(),
+                    write_batch_size: props.write_batch_size(),
+                    writer_version: format!("{}.0", 
props.writer_version().as_num()),
+                    dictionary_page_size_limit: 
props.dictionary_page_size_limit(),
+                    max_row_group_size: props.max_row_group_size(),
+                    created_by: props.created_by().to_string(),
+                    column_index_truncate_length: 
props.column_index_truncate_length(),
+                    data_page_row_count_limit: 
props.data_page_row_count_limit(),
+
+                    // global options which set the default column props
+                    encoding: default_col_props.encoding,
+                    compression: default_col_props.compression,
+                    dictionary_enabled: default_col_props.dictionary_enabled,
+                    statistics_enabled: default_col_props.statistics_enabled,
+                    max_statistics_size: default_col_props.max_statistics_size,
+                    bloom_filter_on_write: default_col_props
+                        .bloom_filter_enabled
+                        .unwrap_or_default(),
+                    bloom_filter_fpp: default_col_props.bloom_filter_fpp,
+                    bloom_filter_ndv: default_col_props.bloom_filter_ndv,
+
+                    // not in WriterProperties
+                    enable_page_index: Default::default(),
+                    pruning: Default::default(),
+                    skip_metadata: Default::default(),
+                    metadata_size_hint: Default::default(),
+                    pushdown_filters: Default::default(),
+                    reorder_filters: Default::default(),
+                    allow_single_file_parallelism: Default::default(),
+                    maximum_parallel_row_group_writers: Default::default(),
+                    maximum_buffered_record_batches_per_stream: 
Default::default(),
+                    bloom_filter_on_read: Default::default(),
+                },
+                column_specific_options: HashMap::from([(
+                    COL_NAME.into(),
+                    configured_col_props,
+                )]),
+                key_value_metadata,
+            }
+        }
+
+        #[test]

Review Comment:
   Maybe we could also add a test here that described in 
https://github.com/apache/datafusion/issues/11367 -- that takes a default 
`ParquetOptions`, creates the `ArrowWriterOption` and then ensures the fields 
are the same as `ArrowWriterOptions::default()`
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to