2010YOUY01 commented on code in PR #16268: URL: https://github.com/apache/datafusion/pull/16268#discussion_r2141401180
########## datafusion/common/src/config.rs: ########## @@ -274,6 +276,60 @@ config_namespace! { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SpillCompression { + Zstd, + Lz4Frame, + Uncompressed, +} + +impl FromStr for SpillCompression { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_ascii_lowercase().as_str() { + "zstd" => Ok(Self::Zstd), + "lz4_frame" => Ok(Self::Lz4Frame), + "uncompressed" | "" => Ok(Self::Uncompressed), + other => Err(DataFusionError::Execution(format!( + "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4, uncompressed" Review Comment: ```suggestion "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4_frame, uncompressed" ``` ########## datafusion/common/src/config.rs: ########## @@ -274,6 +276,60 @@ config_namespace! { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SpillCompression { + Zstd, + Lz4Frame, + Uncompressed, +} + +impl FromStr for SpillCompression { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_ascii_lowercase().as_str() { + "zstd" => Ok(Self::Zstd), + "lz4_frame" => Ok(Self::Lz4Frame), + "uncompressed" | "" => Ok(Self::Uncompressed), + other => Err(DataFusionError::Execution(format!( Review Comment: `DataFusionError::Config` can be more suitable here. ########## datafusion/common/src/config.rs: ########## @@ -330,6 +386,13 @@ config_namespace! { /// the new schema verification step. pub skip_physical_aggregate_schema_check: bool, default = false + /// Sets the compression codec used when spilling data to disk. + /// + /// Since datafusion writes spill files using the Arrow IPC Stream format, + /// only codecs supported by the Arrow IPC Stream Writer are allowed. + /// Valid values are: uncompressed, lz4_frame, zstd + pub spill_compression: SpillCompression, default = SpillCompression::Uncompressed Review Comment: `lz4` is likely to provide the best speed/space tradeoff, we'll do some benchmark to confirm it. ########## datafusion/common/src/config.rs: ########## @@ -330,6 +386,13 @@ config_namespace! { /// the new schema verification step. pub skip_physical_aggregate_schema_check: bool, default = false + /// Sets the compression codec used when spilling data to disk. + /// + /// Since datafusion writes spill files using the Arrow IPC Stream format, + /// only codecs supported by the Arrow IPC Stream Writer are allowed. + /// Valid values are: uncompressed, lz4_frame, zstd Review Comment: I suggest to mention it in the doc (as a quick reminder): lz4 is faster to compress, but has lower compression ratio, whereas zstd is slower to compress but achieves higher compression ratio. We can do some experiment in the future to make this info more specific, like what's the measured compression rate/speed on tpch table in arrow batches. ########## datafusion/physical-plan/src/spill/spill_manager.rs: ########## @@ -44,16 +44,23 @@ pub struct SpillManager { schema: SchemaRef, /// Number of batches to buffer in memory during disk reads batch_read_buffer_capacity: usize, - // TODO: Add general-purpose compression options + /// general-purpose compression options + pub(crate) compression: SpillCompression, } impl SpillManager { - pub fn new(env: Arc<RuntimeEnv>, metrics: SpillMetrics, schema: SchemaRef) -> Self { + pub fn new( + env: Arc<RuntimeEnv>, + metrics: SpillMetrics, + schema: SchemaRef, + compression: SpillCompression, Review Comment: I suggest to use a builder pattern to set this new field. The reason is `SpillManager` is a public interface that can be used outside the crate, so it's better to keep the API stable. To do that, use a default compression option in `new()`, and add a `with_compression_type()` function to change the default compression type. ########## datafusion/common/src/config.rs: ########## @@ -274,6 +276,60 @@ config_namespace! { } } +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum SpillCompression { + Zstd, + Lz4Frame, + Uncompressed, +} + +impl FromStr for SpillCompression { + type Err = DataFusionError; + + fn from_str(s: &str) -> Result<Self, Self::Err> { + match s.to_ascii_lowercase().as_str() { + "zstd" => Ok(Self::Zstd), + "lz4_frame" => Ok(Self::Lz4Frame), + "uncompressed" | "" => Ok(Self::Uncompressed), + other => Err(DataFusionError::Execution(format!( + "Invalid Spill file compression type: {other}. Expected one of: zstd, lz4, uncompressed" + ))), + } + } +} + +impl ConfigField for SpillCompression { + fn visit<V: Visit>(&self, v: &mut V, key: &str, description: &'static str) { + v.some(key, self, description) + } + + fn set(&mut self, _: &str, value: &str) -> Result<()> { + *self = SpillCompression::from_str(value)?; + Ok(()) + } +} + +impl Display for SpillCompression { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let str = match self { + Self::Zstd => "zstd", + Self::Lz4Frame => "lz4_frame", + Self::Uncompressed => "", Review Comment: Should here be "uncompressed" literal? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org