Re: [PR] Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream [datafusion]
kosiew commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1904814032 ## datafusion/core/src/dataframe/mod.rs: ## @@ -2743,6 +2754,143 @@ mod tests { Ok(()) } +// test for https://github.com/apache/datafusion/issues/13949 +async fn run_test_with_spill_pool_if_necessary( Review Comment: hi @korowa , ie datafusion/physical-plan/src/aggregates/mod.rs, am I correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream [datafusion]
kosiew commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1904829349 ## datafusion/physical-plan/src/aggregates/row_hash.rs: ## @@ -522,7 +527,7 @@ impl GroupedHashAggregateStream { let spill_state = SpillState { spills: vec![], spill_expr, -spill_schema: Arc::clone(&agg_schema), +spill_schema: partial_agg_schema, Review Comment: Thanks for confirming. The lines are removed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Default to ZSTD compression when writing Parquet [datafusion-python]
kevinjqliu commented on code in PR #981: URL: https://github.com/apache/datafusion-python/pull/981#discussion_r1904816353 ## python/datafusion/dataframe.py: ## @@ -620,16 +620,25 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, -compression: str = "uncompressed", +compression: str = "ZSTD", compression_level: int | None = None, Review Comment: wydt about ```suggestion compression_level: int = 4, ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Default to ZSTD compression when writing Parquet [datafusion-python]
kosiew commented on code in PR #981: URL: https://github.com/apache/datafusion-python/pull/981#discussion_r1904826148 ## python/datafusion/dataframe.py: ## @@ -620,16 +620,25 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, -compression: str = "uncompressed", +compression: str = "ZSTD", compression_level: int | None = None, Review Comment: Thanks @kevinjqliu I like the simplification but thought that it might give the wrong impression that 4 is a suitable default for other compression types eg SNAPPY, GZIP, BROTLI, LZ4 as well. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] supports_filters_pushdown is invoked more than once on a single Custom Data Source [datafusion]
jonahgao commented on issue #13994: URL: https://github.com/apache/datafusion/issues/13994#issuecomment-2574284445 > if `&mut self` was passed to this fn it would be much easier to control the functionality. Since checking for supportability is more of a read-only operation, I think using `&self` would be more appropriate. Additionally, changing this would break many downstreams, so it’s usually more conservative to avoid making such breaking changes. > Thanks for explaining this. We can probably work with this but the issue is that since we want some filters and not others (in other words some are preferred indexes) we need to keep state between supports_filters_pushdown calls. DataFusion currently has no knowledge about the relationship between multiple filters, and the current way it operates assumes that they are independent. Another approach may be to add your own [OptimizerRule](https://github.com/apache/datafusion/blob/4e877a08d224d992a8cbcc9a14f59468e312b13f/datafusion-examples/examples/optimizer_rule.rs#L35). You can select the filters in your [TableScan](https://github.com/apache/datafusion/blob/4e877a08d224d992a8cbcc9a14f59468e312b13f/datafusion/expr/src/logical_plan/plan.rs#L2486) and return the unnecessary ones to the parent `Filter` plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Improve perfomance of `reverse` function [datafusion]
simonvandel commented on code in PR #14025: URL: https://github.com/apache/datafusion/pull/14025#discussion_r1904839947 ## datafusion/functions/src/unicode/reverse.rs: ## @@ -116,14 +115,23 @@ pub fn reverse(args: &[ArrayRef]) -> Result { } } -fn reverse_impl<'a, T: OffsetSizeTrait, V: ArrayAccessor>( +fn reverse_impl<'a, T: OffsetSizeTrait, V: StringArrayType<'a>>( string_array: V, ) -> Result { -let result = ArrayIter::new(string_array) -.map(|string| string.map(|string: &str| string.chars().rev().collect::())) -.collect::>(); +let mut builder: GenericStringBuilder = +GenericStringBuilder::with_capacity(string_array.len(), 1024); + +for string in string_array.iter() { +if let Some(s) = string { +let mut reversed = String::with_capacity(s.len()); Review Comment: I wonder if this allocation can be removed by using the Write impl? See https://arrow.apache.org/rust/arrow/array/type.GenericStringBuilder.html#example-incrementally-writing-strings-with-stdfmtwrite Perhaps by iterating through the rev iterator, writing chars one at a time. --- If the above is slower, it could also be interesting to see if reusing the String allocation with a clear() on every loop is faster -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream [datafusion]
kosiew commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1904814032 ## datafusion/core/src/dataframe/mod.rs: ## @@ -2743,6 +2754,143 @@ mod tests { Ok(()) } +// test for https://github.com/apache/datafusion/issues/13949 +async fn run_test_with_spill_pool_if_necessary( Review Comment: hi @korowa , ie [move to datafusion/physical-plan/src/aggregates/mod.rs](https://github.com/apache/datafusion/pull/13995/commits/4e312e1ff0672812e7e12907cd2038bfd4eb6232), am I correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] supports_filters_pushdown is invoked more than once on a single Custom Data Source [datafusion]
jonahgao commented on issue #13994: URL: https://github.com/apache/datafusion/issues/13994#issuecomment-2574295463 > One last question: If I have a query with [filterA,filterB] and on the initial call I return [Unsupported,Exact] and on a subsequent call you send me [filterA] and I return [Unsupported], then I believe the scan filters comes through as [] (empty). Is DataFusion still respecting the Exact on filterB? My guess is not, that is another important problem as if my custom data source only sends back data that matches filterB and DataFusion is again filtering all result data by filterB again the pushdown is not doing anything. I see this now, it does work because my data source did filter the data properly, but my guess is DataFusion is doing extra work it should not do. In the current implementation, returning Exact filters will be pushed down to the TableScan and will be retained. Each round of pushdown will not revoke the previous effect. But it's best not to rely on this; `supports_filters_pushdown` is a guarantee for DataFusion that your data source has the ability to handle a certain filter. DataFusion can push it down, but that doesn't mean it must be pushed down. Your scan needs to work regardless of whether filters are present. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[PR] [comet-parquet-exec] fix: Fix null struct [datafusion-comet]
andygrove opened a new pull request, #1226: URL: https://github.com/apache/datafusion-comet/pull/1226 ## Which issue does this PR close? N/A ## Rationale for this change Fix bug in reading null structs to fix some test failures ## What changes are included in this PR? ## How are these changes tested? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] chore: Follow-on PR to fully enable onheap memory usage [datafusion-comet]
andygrove merged PR #1210: URL: https://github.com/apache/datafusion-comet/pull/1210 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat(optimizer): Enable filter pushdown on window functions [datafusion]
comphead commented on PR #14026: URL: https://github.com/apache/datafusion/pull/14026#issuecomment-2574160403 Thats a really nice idea, thanks @nuno-faria -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[PR] chore: deprecate `ValuesExec` in favour of `MemoryExec` [datafusion]
jonathanc-n opened a new pull request, #14032: URL: https://github.com/apache/datafusion/pull/14032 ## Which issue does this PR close? Closes #13968 . ## Rationale for this change ## What changes are included in this PR? ## Are these changes tested? ## Are there any user-facing changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Only create one native plan for a query on an executor [datafusion-comet]
viirya commented on issue #1204: URL: https://github.com/apache/datafusion-comet/issues/1204#issuecomment-2573663473 If ScanExec will be rarely used and we would like to use ParquetExec for most time, maybe I can just add an internal cast to ScanExec if the schema is different. Though it might hurt performance a little bit. But if it is for rare case, it should be acceptable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[PR] Refac: make Nested Func public and implement Default trait [datafusion]
dharanad opened a new pull request, #14030: URL: https://github.com/apache/datafusion/pull/14030 ## Which issue does this PR close? Closes #. ## Rationale for this change ## What changes are included in this PR? ## Are these changes tested? ## Are there any user-facing changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Refac: make Nested Func public and implement Default trait [datafusion]
dharanad commented on PR #14030: URL: https://github.com/apache/datafusion/pull/14030#issuecomment-2573939947 cc @alamb @jayzhan211 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[PR] fix: Simplify native scan config [datafusion-comet]
parthchandra opened a new pull request, #1225: URL: https://github.com/apache/datafusion-comet/pull/1225 ## Which issue does this PR close? Simplifies native scan config To choose a native scan implementation we can now set `spark.comet.scan.impl` Valid values are `native`, `native_full`, `native_recordbatch` | reader | description | | -- | -- | | native | Original native reader | | native_full | full native reader based on data fusion | | native_recordbatch | native reading of record batch columns (iceberg) | Closes #. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] fix: Simplify native scan config [datafusion-comet]
parthchandra commented on PR #1225: URL: https://github.com/apache/datafusion-comet/pull/1225#issuecomment-2573863212 @andygrove @mbutrovich The config defaults to `full_native`. The switch the implementation in tests, change the values in `CometConf`, `CometTestBase`, and `CometPlanStabilitySuite`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Feat: Add support for `array_size` [datafusion-comet]
dharanad closed pull request #1214: Feat: Add support for `array_size` URL: https://github.com/apache/datafusion-comet/pull/1214 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Feat: Add support for `array_size` [datafusion-comet]
dharanad commented on PR #1214: URL: https://github.com/apache/datafusion-comet/pull/1214#issuecomment-2573867883 > There is already one PR for array_size support: #1122 I must have overlooked. Thanks for letting me know. Closing this PR -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] [comet-parquet-exec] fix: Simplify native scan config [datafusion-comet]
andygrove merged PR #1225: URL: https://github.com/apache/datafusion-comet/pull/1225 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Define extension API for user-defined invariants. [datafusion]
alamb commented on issue #14029: URL: https://github.com/apache/datafusion/issues/14029#issuecomment-2573905410 For example, if we added a function like this to the `ExecutionPlan` trait, as proposed in https://github.com/apache/datafusion/pull/13986#discussion_r1901312798 I think that would permit implementing the invariant check without any more work as I understand it ```rust impl ExecutionPlan { ... fn check_node_invariants(&self, invariant_level: InvariantLevel) -> Result<()> Ok(()) } ... } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Define extension API for user-defined invariants. [datafusion]
wiedld commented on issue #14029: URL: https://github.com/apache/datafusion/issues/14029#issuecomment-2573909488 > For example, if we added a function like this to the `ExecutionPlan` trait, as proposed in [#13986 (comment)](https://github.com/apache/datafusion/pull/13986#discussion_r1901312798) I think that would permit implementing the invariant check without any more work as I understand it > Agreed. If this^^ is the final form of the execution plan invariants, then we don't need an additional interface for the execution plan invariant extensions. That would still leave the logical plan invariant extensions, for consideration. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Minor: Remove redundant implementation of `StringArrayType` [datafusion]
alamb commented on PR #14023: URL: https://github.com/apache/datafusion/pull/14023#issuecomment-2573992922 I pushed a commit to deprecate (rather than remove) the trait. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Minor: make nested functions public and implement Default trait [datafusion]
alamb merged PR #14030: URL: https://github.com/apache/datafusion/pull/14030 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] chore: Follow-on PR to fully enable onheap memory usage [datafusion-comet]
andygrove commented on PR #1210: URL: https://github.com/apache/datafusion-comet/pull/1210#issuecomment-2573995471 Thanks for the reviews @viirya @kazuyukitanimura @Kontinuation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Add H2O.ai Database-like Ops benchmark to dfbench (groupby support) [datafusion]
zhuqi-lucas commented on PR #13996: URL: https://github.com/apache/datafusion/pull/13996#issuecomment-2574217686 Hi @alamb This is the PR support for groupby first. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream [datafusion]
kosiew commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1904803932 ## datafusion/physical-plan/src/aggregates/row_hash.rs: ## @@ -802,6 +807,45 @@ impl RecordBatchStream for GroupedHashAggregateStream { } } +// fix https://github.com/apache/datafusion/issues/13949 +/// Builds a **partial aggregation** schema by combining the group columns and +/// the accumulator state columns produced by each aggregate expression. +/// +/// # Why Partial Aggregation Schema Is Needed +/// +/// In a multi-stage (partial/final) aggregation strategy, each partial-aggregate +/// operator produces *intermediate* states (e.g., partial sums, counts) rather +/// than final scalar values. These extra columns do **not** exist in the original +/// input schema (which may be something like `[colA, colB, ...]`). Instead, +/// each aggregator adds its own internal state columns (e.g., `[acc_state_1, acc_state_2, ...]`). +/// +/// Therefore, when we spill these intermediate states or pass them to another +/// aggregation operator, we must use a schema that includes both the group +/// columns **and** the partial-state columns. Otherwise, using the original input +/// schema to read partial states will result in a column-count mismatch error. +/// +/// This helper function constructs such a schema: +/// `[group_col_1, group_col_2, ..., state_col_1, state_col_2, ...]` +/// so that partial aggregation data can be handled consistently. +fn build_partial_agg_schema( Review Comment: Aaa. 🤔 Thanks for the pointer. It does work. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[I] Enhance msrv check to check all crates [datafusion]
Jefffrey opened a new issue, #14022: URL: https://github.com/apache/datafusion/issues/14022 I wonder if should be checking more (or all) crates here? https://github.com/apache/datafusion/blob/b8b0c5584f9f3a3aeca730ef1ac23dafc3e76dde/.github/workflows/rust.yml#L594-L641 See arrow-rs PR for reference which identified a similar issue there: https://github.com/apache/arrow-rs/pull/6742 _Originally posted by @Jefffrey in https://github.com/apache/datafusion/pull/14009#pullrequestreview-2530810617_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Improve performance of `find_in_set` function [datafusion]
jayzhan211 commented on code in PR #14020: URL: https://github.com/apache/datafusion/pull/14020#discussion_r1904073770 ## datafusion/functions/src/unicode/find_in_set.rs: ## @@ -138,31 +138,144 @@ fn find_in_set(args: &[ArrayRef]) -> Result { } } -pub fn find_in_set_general<'a, T: ArrowPrimitiveType, V: ArrayAccessor>( +pub fn find_in_set_general<'a, T, V>( string_array: V, str_list_array: V, ) -> Result where +T: ArrowPrimitiveType, T::Native: OffsetSizeTrait, +V: ArrayAccessor, { let string_iter = ArrayIter::new(string_array); let str_list_iter = ArrayIter::new(str_list_array); -let result = string_iter + +let mut builder = PrimitiveArraybuilder(string_iter.len()); + +string_iter .zip(str_list_iter) -.map(|(string, str_list)| match (string, str_list) { -(Some(string), Some(str_list)) => { -let mut res = 0; -let str_set: Vec<&str> = str_list.split(',').collect(); -for (idx, str) in str_set.iter().enumerate() { -if str == &string { -res = idx + 1; -break; -} +.for_each( +|(string_opt, str_list_opt)| match (string_opt, str_list_opt) { +(Some(string), Some(str_list)) => { +let position = str_list +.split(',') +.position(|s| s == string) +.map_or(0, |idx| idx + 1); + builder.append_value(T::Native::from_usize(position).unwrap()); Review Comment: Probably faster if create `Vec` first and use `PrimitiveArray::from()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] [Minor] refactor: make ArraySort public for broader access [datafusion]
jayzhan211 commented on PR #14006: URL: https://github.com/apache/datafusion/pull/14006#issuecomment-2572979702 Thanks @dharanad @alamb -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] [Minor] refactor: make ArraySort public for broader access [datafusion]
jayzhan211 merged PR #14006: URL: https://github.com/apache/datafusion/pull/14006 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream [datafusion]
kosiew commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1904077638 ## datafusion/physical-plan/src/aggregates/row_hash.rs: ## @@ -802,6 +807,45 @@ impl RecordBatchStream for GroupedHashAggregateStream { } } +// fix https://github.com/apache/datafusion/issues/13949 +/// Builds a **partial aggregation** schema by combining the group columns and +/// the accumulator state columns produced by each aggregate expression. +/// +/// # Why Partial Aggregation Schema Is Needed +/// +/// In a multi-stage (partial/final) aggregation strategy, each partial-aggregate +/// operator produces *intermediate* states (e.g., partial sums, counts) rather +/// than final scalar values. These extra columns do **not** exist in the original +/// input schema (which may be something like `[colA, colB, ...]`). Instead, +/// each aggregator adds its own internal state columns (e.g., `[acc_state_1, acc_state_2, ...]`). +/// +/// Therefore, when we spill these intermediate states or pass them to another +/// aggregation operator, we must use a schema that includes both the group +/// columns **and** the partial-state columns. Otherwise, using the original input +/// schema to read partial states will result in a column-count mismatch error. +/// +/// This helper function constructs such a schema: +/// `[group_col_1, group_col_2, ..., state_col_1, state_col_2, ...]` +/// so that partial aggregation data can be handled consistently. +fn build_partial_agg_schema( Review Comment: I checked create_schema and it handles aggregates like MIN, MAX well but it does not handle AVG which has multiple intermediate states (partial sum, partial count). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] `url` dependancy update [datafusion]
vadimpiven commented on code in PR #14019: URL: https://github.com/apache/datafusion/pull/14019#discussion_r1904083177 ## Cargo.toml: ## @@ -150,7 +150,7 @@ serde_json = "1" sqlparser = { version = "0.53.0", features = ["visitor"] } tempfile = "3" tokio = { version = "1.36", features = ["macros", "rt", "sync"] } -url = "2.2" +url = "2.5.4" Review Comment: Good point, missed that) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Use workspace rust-version for all workspace crates [datafusion]
alamb commented on PR #14009: URL: https://github.com/apache/datafusion/pull/14009#issuecomment-2572999127 (BTW welcome back @Jefffrey -- it is great to have you around!) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Assert for invariants in tests and debug builds [datafusion]
alamb commented on issue #594: URL: https://github.com/apache/datafusion/issues/594#issuecomment-2572965153 It seems as have re-discovered this idea 10,000 tickets later in - https://github.com/apache/datafusion/issues/13652 FYI @wiedld Let's close this issue and use the newer one for further discussion -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Use workspace rust-version for all workspace crates [datafusion]
Jefffrey commented on PR #14009: URL: https://github.com/apache/datafusion/pull/14009#issuecomment-2572965242 Thanks @alamb Raised #14022 as well -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Assert for invariants in tests and debug builds [datafusion]
alamb closed issue #594: Assert for invariants in tests and debug builds URL: https://github.com/apache/datafusion/issues/594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Automatically check "invariants" [datafusion]
alamb commented on issue #13652: URL: https://github.com/apache/datafusion/issues/13652#issuecomment-2572966102 I just discovered that @houqp basically filed this same ticket 2 years ago: - https://github.com/apache/datafusion/issues/594 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Use partial aggregation schema for spilling to avoid column mismatch in GroupedHashAggregateStream [datafusion]
korowa commented on code in PR #13995: URL: https://github.com/apache/datafusion/pull/13995#discussion_r1904059022 ## datafusion/physical-plan/src/aggregates/row_hash.rs: ## @@ -522,7 +527,7 @@ impl GroupedHashAggregateStream { let spill_state = SpillState { spills: vec![], spill_expr, -spill_schema: Arc::clone(&agg_schema), +spill_schema: partial_agg_schema, Review Comment: Yes, this line seems to be redundant now -- I'd expect all aggregation modes to have the same spill schema (which is set by this PR), so it shouldn't depend on stream input anymore. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Fix rust-version key in workspace Cargo.toml to inherit from workspace [datafusion]
Jefffrey closed issue #9214: Fix rust-version key in workspace Cargo.toml to inherit from workspace URL: https://github.com/apache/datafusion/issues/9214 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Use workspace rust-version for all workspace crates [datafusion]
Jefffrey merged PR #14009: URL: https://github.com/apache/datafusion/pull/14009 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Improve performance of `find_in_set` function [datafusion]
jayzhan-synnada commented on code in PR #14020: URL: https://github.com/apache/datafusion/pull/14020#discussion_r1904073374 ## datafusion/functions/src/unicode/find_in_set.rs: ## @@ -138,31 +138,144 @@ fn find_in_set(args: &[ArrayRef]) -> Result { } } -pub fn find_in_set_general<'a, T: ArrowPrimitiveType, V: ArrayAccessor>( +pub fn find_in_set_general<'a, T, V>( string_array: V, str_list_array: V, ) -> Result where +T: ArrowPrimitiveType, T::Native: OffsetSizeTrait, +V: ArrayAccessor, { let string_iter = ArrayIter::new(string_array); let str_list_iter = ArrayIter::new(str_list_array); -let result = string_iter + +let mut builder = PrimitiveArraybuilder(string_iter.len()); + +string_iter .zip(str_list_iter) -.map(|(string, str_list)| match (string, str_list) { -(Some(string), Some(str_list)) => { -let mut res = 0; -let str_set: Vec<&str> = str_list.split(',').collect(); -for (idx, str) in str_set.iter().enumerate() { -if str == &string { -res = idx + 1; -break; -} +.for_each( +|(string_opt, str_list_opt)| match (string_opt, str_list_opt) { +(Some(string), Some(str_list)) => { +let position = str_list +.split(',') +.position(|s| s == string) +.map_or(0, |idx| idx + 1); + builder.append_value(T::Native::from_usize(position).unwrap()); Review Comment: Probably faster if create `Vec` first and use `PrimitiveArray::from()` ## datafusion/functions/src/unicode/find_in_set.rs: ## @@ -138,31 +138,144 @@ fn find_in_set(args: &[ArrayRef]) -> Result { } } -pub fn find_in_set_general<'a, T: ArrowPrimitiveType, V: ArrayAccessor>( +pub fn find_in_set_general<'a, T, V>( string_array: V, str_list_array: V, ) -> Result where +T: ArrowPrimitiveType, T::Native: OffsetSizeTrait, +V: ArrayAccessor, { let string_iter = ArrayIter::new(string_array); let str_list_iter = ArrayIter::new(str_list_array); -let result = string_iter + +let mut builder = PrimitiveArraybuilder(string_iter.len()); + +string_iter .zip(str_list_iter) -.map(|(string, str_list)| match (string, str_list) { -(Some(string), Some(str_list)) => { -let mut res = 0; -let str_set: Vec<&str> = str_list.split(',').collect(); -for (idx, str) in str_set.iter().enumerate() { -if str == &string { -res = idx + 1; -break; -} +.for_each( +|(string_opt, str_list_opt)| match (string_opt, str_list_opt) { +(Some(string), Some(str_list)) => { +let position = str_list +.split(',') +.position(|s| s == string) +.map_or(0, |idx| idx + 1); + builder.append_value(T::Native::from_usize(position).unwrap()); Review Comment: Probably faster if create `Vec` first and use `PrimitiveArray::from()` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[PR] Minor: Remove redundant implementation of `StringArrayType` [datafusion]
tlm365 opened a new pull request, #14023: URL: https://github.com/apache/datafusion/pull/14023 ## Which issue does this PR close? Closes #. ## Rationale for this change Remove redundant implementation of `StringArrayType` ## What changes are included in this PR? ## Are these changes tested? By CI. ## Are there any user-facing changes? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Minor: Remove redundant implementation of `StringArrayType` [datafusion]
alamb commented on PR #14023: URL: https://github.com/apache/datafusion/pull/14023#issuecomment-2573043773 Thank you @tlm365 ❤️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Only create one native plan for a query on an executor [datafusion-comet]
andygrove commented on issue #1204: URL: https://github.com/apache/datafusion-comet/issues/1204#issuecomment-2573648804 With the new Parquet POC 1 & 2, we will use ParquetExec instead of the current ScanExec, so at leat for that case the schema will already be known and we will no longer need to fetch the first batch to determine it. This doesn't help with other uses of ScanExec though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Only create one native plan for a query on an executor [datafusion-comet]
andygrove commented on issue #1204: URL: https://github.com/apache/datafusion-comet/issues/1204#issuecomment-2573676778 We'll still use ScanExec for shuffle reader though. The main reason for the initial batch scan is to determine if strings are dictionary-encoded or not. We then cast all batches to match the first batch (either unpacking dictionaries or forcing dictionary encoding). We always unpack dictionaries (in CopyExec) before a Sort or a Join anyway, so maybe we should just unpack them directly in ScanExec if there is no performance impact. I did experiment with this before but I do not remember what the performance impact was but I think it was small. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[PR] fix: yield when the next file is ready to open to prevent CPU starvation [datafusion]
jeffreyssmith2nd opened a new pull request, #14028: URL: https://github.com/apache/datafusion/pull/14028 ## Which issue does this PR close? Closes #. ## Rationale for this change ## What changes are included in this PR? ## Are these changes tested? ## Are there any user-facing changes? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Add support for MySQL's INSERT INTO ... SET syntax [datafusion-sqlparser-rs]
iffyio merged PR #1641: URL: https://github.com/apache/datafusion-sqlparser-rs/pull/1641 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Only create one native plan for a query on an executor [datafusion-comet]
viirya commented on issue #1204: URL: https://github.com/apache/datafusion-comet/issues/1204#issuecomment-2573695955 Okay. Then seems we can get rid of first batch fetch in ScanExec and assign the scan schema from Spark. I will make a try. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] chore: extract agg_funcs expressions to folders based on spark grouping [datafusion-comet]
andygrove commented on PR #1224: URL: https://github.com/apache/datafusion-comet/pull/1224#issuecomment-2573697981 @rluvaton could you rebase this one and we can merge this one next? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Support pruning on string columns using LIKE [datafusion]
alamb commented on issue #507: URL: https://github.com/apache/datafusion/issues/507#issuecomment-2573715986 > I think we also need follow up tickets for: > > * NOT LIKE > * Case insensitive matching Sounds good -- can you please file them (and the more hints you leave in the ticket the more likely it is for someone else to be able to do it) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
kazuyukitanimura commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1904545210 ## spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.execution.shuffle + +import java.io.{EOFException, InputStream} +import java.nio.{ByteBuffer, ByteOrder} +import java.nio.channels.{Channels, ReadableByteChannel} + +import org.apache.spark.TaskContext +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.comet.Native +import org.apache.comet.vector.NativeUtil + +/** + * This iterator wraps a Spark input stream that is reading shuffle blocks generated by the Comet + * native ShuffleWriterExec and then calls native code to decompress and decode the shuffle blocks + * and use Arrow FFI to return the Arrow record batch. + */ +case class NativeBatchDecoderIterator( +var in: InputStream, +taskContext: TaskContext, +decodeTime: SQLMetric) +extends Iterator[ColumnarBatch] { + + private var isClosed = false + private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN) + private val native = new Native() + private val nativeUtil = new NativeUtil() + private var currentBatch: ColumnarBatch = null + private var batch = fetchNext() + + import NativeBatchDecoderIterator.threadLocalDataBuf + + if (taskContext != null) { +taskContext.addTaskCompletionListener[Unit](_ => { + close() +}) + } + + private val channel: ReadableByteChannel = if (in != null) { +Channels.newChannel(in) + } else { +null + } + + def hasNext(): Boolean = { +if (channel == null || isClosed) { + return false +} +if (batch.isDefined) { + return true +} + +// Release the previous batch. +if (currentBatch != null) { + currentBatch.close() + currentBatch = null +} + +batch = fetchNext() +if (batch.isEmpty) { + close() + return false +} +true + } + + def next(): ColumnarBatch = { +if (!hasNext) { + throw new NoSuchElementException +} + +val nextBatch = batch.get + +currentBatch = nextBatch +batch = None +currentBatch + } + + private def fetchNext(): Option[ColumnarBatch] = { +if (channel == null || isClosed) { + return None +} + +// read compressed batch size from header +try { + longBuf.clear() + while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} +} catch { + case _: EOFException => +close() +return None +} + +// If we reach the end of the stream, we are done, or if we read partial length +// then the stream is corrupted. +if (longBuf.hasRemaining) { + if (longBuf.position() == 0) { +close() +return None + } + throw new EOFException("Data corrupt: unexpected EOF while reading compressed ipc lengths") +} + +// get compressed length (including headers) +longBuf.flip() +val compressedLength = longBuf.getLong.toInt + +// read field count from header +longBuf.clear() +while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} +if (longBuf.hasRemaining) { + throw new EOFException("Data corrupt: unexpected EOF while reading field count") +} +longBuf.flip() +val fieldCount = longBuf.getLong.toInt + +// read body +val bytesToRead = compressedLength - 8 +var dataBuf = threadLocalDataBuf.get() +if (dataBuf.capacity() < bytesToRead) { + // it is unlikely that we would overflow here since it would + // require a 1GB compressed shuffle block but we check anyway + val newCapacity = (bytesToRead * 2L).min(Integer.MAX_VALUE).toInt Review Comment: `bytesToRead * 2L` may become negative, and in that case, `newCapacity` would be negative... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.
[I] Define extension API for user-defined invariants. [datafusion]
wiedld opened a new issue, #14029: URL: https://github.com/apache/datafusion/issues/14029 ### Is your feature request related to a problem or challenge? As part of the work to [automatically check invariants](https://github.com/apache/datafusion/issues/13652) for the logical and execution plans, we have provided infrastructure to run an invariant checker. This invariant checker runs at limited time points in order to not degrade planning performance; e.g. after all optimizations are completed. In debug mode, it runs these checks more often and can therefore help to quickly isolate at which point (e.g. which specific optimizer run) make the plan become invalid. We want to also enable users to add their own invariants. Users are already able to add their own Logical and Execution plan extensions, as well as their own optimization runs which modify these plans. Therefore it may be useful for an invariant extension interface for user-defined invariants. e.g. If a change in Datafusion core's optimizer passes will cause a problem in a user-defined Logical plan extension, then the user could define an invariant based upon what their Logical plan extension requires. Refer to specific examples [in this conversation](https://github.com/apache/datafusion/pull/13651#discussion_r1873973604), for plan extensions which have their own invariants. For the example case of our own `ProgressiveEval` -- we require the input partition streams to have specific sort orders, non-overlapping column ranges, and no pushdown of the offset [(issue)](https://github.com/apache/datafusion/issues/12423) in order to provide the correct result. An invariant check, performed after each optimizer run (while in debug mode), would enable us to quickly isolate the problem during DF upgrade. (We have several other, more complex, examples of how changes in the optimization of UNIONs has produced invalid plans for our `SortPreservingMerge`. So this is not a one-off example, the above is merely the simplest concrete example.) ### Describe the solution you'd like Take the existing invariant infrastructure provided as part of [this issue](https://github.com/apache/datafusion/issues/13652#issuecomment-2573659546), and provide extension points for users to define their own invariants. ### Describe alternatives you've considered * Alternative 1: for a user-defined Execution plan extension, have a runtime check of invariants be performed. * Con: this detects problems after planning time, thereby increasing both time-until-error as well as resource utilization. * Alternative 2: for either Logical or Physical plan extensions, the user can define an optimization run which is intended to detect invariant violations which are in conflict with their plan extensions. * Pro: can detect invariant violation at planning time * Con: arguably more code code complexity: * in order to isolate exactly which plan mutation (Datafusion core change) caused the problem, it would need to be coded to run after each optimizer pass. ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] chore: extract json_funcs expressions to folders based on spark grouping [datafusion-comet]
codecov-commenter commented on PR #1220: URL: https://github.com/apache/datafusion-comet/pull/1220#issuecomment-2573627436 ## [Codecov](https://app.codecov.io/gh/apache/datafusion-comet/pull/1220?dropdown=coverage&src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) Report All modified and coverable lines are covered by tests :white_check_mark: > Project coverage is 34.81%. Comparing base [(`5f1e998`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/5f1e99830caec96ce5889656cb8fca68a6d6d5e0?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache) to head [(`45cb6f9`)](https://app.codecov.io/gh/apache/datafusion-comet/commit/45cb6f9ec1574d51b237e933a8e018c1893ab62b?dropdown=coverage&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). > Report is 2 commits behind head on main. Additional details and impacted files ```diff @@ Coverage Diff @@ ## main#1220 +/- ## + Coverage 34.30% 34.81% +0.51% - Complexity 950 983 +33 Files 116 116 Lines 4371143767 +56 Branches 9565 9554 -11 + Hits 1499415238 +244 + Misses2574825561 -187 + Partials 2969 2968 -1 ``` [:umbrella: View full report in Codecov by Sentry](https://app.codecov.io/gh/apache/datafusion-comet/pull/1220?dropdown=coverage&src=pr&el=continue&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). :loudspeaker: Have feedback on the report? [Share it here](https://about.codecov.io/codecov-pr-comment-feedback/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=apache). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
andygrove commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1904559170 ## spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.execution.shuffle + +import java.io.{EOFException, InputStream} +import java.nio.{ByteBuffer, ByteOrder} +import java.nio.channels.{Channels, ReadableByteChannel} + +import org.apache.spark.TaskContext +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.comet.Native +import org.apache.comet.vector.NativeUtil + +/** + * This iterator wraps a Spark input stream that is reading shuffle blocks generated by the Comet + * native ShuffleWriterExec and then calls native code to decompress and decode the shuffle blocks + * and use Arrow FFI to return the Arrow record batch. + */ +case class NativeBatchDecoderIterator( +var in: InputStream, +taskContext: TaskContext, +decodeTime: SQLMetric) +extends Iterator[ColumnarBatch] { + + private var isClosed = false + private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN) + private val native = new Native() + private val nativeUtil = new NativeUtil() + private var currentBatch: ColumnarBatch = null + private var batch = fetchNext() + + import NativeBatchDecoderIterator.threadLocalDataBuf + + if (taskContext != null) { +taskContext.addTaskCompletionListener[Unit](_ => { + close() +}) + } + + private val channel: ReadableByteChannel = if (in != null) { +Channels.newChannel(in) + } else { +null + } + + def hasNext(): Boolean = { +if (channel == null || isClosed) { + return false +} +if (batch.isDefined) { + return true +} + +// Release the previous batch. +if (currentBatch != null) { + currentBatch.close() + currentBatch = null +} + +batch = fetchNext() +if (batch.isEmpty) { + close() + return false +} +true + } + + def next(): ColumnarBatch = { +if (!hasNext) { + throw new NoSuchElementException +} + +val nextBatch = batch.get + +currentBatch = nextBatch +batch = None +currentBatch + } + + private def fetchNext(): Option[ColumnarBatch] = { +if (channel == null || isClosed) { + return None +} + +// read compressed batch size from header +try { + longBuf.clear() + while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} +} catch { + case _: EOFException => +close() +return None +} + +// If we reach the end of the stream, we are done, or if we read partial length +// then the stream is corrupted. +if (longBuf.hasRemaining) { + if (longBuf.position() == 0) { +close() +return None + } + throw new EOFException("Data corrupt: unexpected EOF while reading compressed ipc lengths") +} + +// get compressed length (including headers) +longBuf.flip() +val compressedLength = longBuf.getLong.toInt + +// read field count from header +longBuf.clear() +while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} +if (longBuf.hasRemaining) { + throw new EOFException("Data corrupt: unexpected EOF while reading field count") +} +longBuf.flip() +val fieldCount = longBuf.getLong.toInt + +// read body +val bytesToRead = compressedLength - 8 +var dataBuf = threadLocalDataBuf.get() +if (dataBuf.capacity() < bytesToRead) { + // it is unlikely that we would overflow here since it would + // require a 1GB compressed shuffle block but we check anyway + val newCapacity = (bytesToRead * 2L).min(Integer.MAX_VALUE).toInt Review Comment: There are some other assumptions in the code though, so I will work on this some more. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org F
Re: [PR] `url` dependancy update [datafusion]
korowa commented on code in PR #14019: URL: https://github.com/apache/datafusion/pull/14019#discussion_r1904056652 ## Cargo.toml: ## @@ -150,7 +150,7 @@ serde_json = "1" sqlparser = { version = "0.53.0", features = ["visitor"] } tempfile = "3" tokio = { version = "1.36", features = ["macros", "rt", "sync"] } -url = "2.2" +url = "2.5.4" Review Comment: Maybe we should do this also for `datafusion-cli/Cargo.toml`? (Though its .lock already has 2.5.4 vesion of `url`) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Feat: Add support for `array_size` [datafusion-comet]
viirya commented on PR #1214: URL: https://github.com/apache/datafusion-comet/pull/1214#issuecomment-2573878690 Thank you @dharanad -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Inference of ListingTableConfig does not work (anymore) for compressed json file [datafusion]
alamb commented on issue #14016: URL: https://github.com/apache/datafusion/issues/14016#issuecomment-2573891190 Thank you @timvw -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Define extension API for user-defined invariants. [datafusion]
alamb commented on issue #14029: URL: https://github.com/apache/datafusion/issues/14029#issuecomment-2573901927 Thanks @wiedld -- I don't fully understand the usecase > Take the existing invariant infrastructure provided as part of https://github.com/apache/datafusion/issues/13652#issuecomment-2573659546, and provide extension points for users to define their own invariants. Could you provide an example of such an invariant? I normally think of "invariants" as some property that always holds true for a certain type of node (for example that `LogicalPlan::Join` always has 2 inputs). The invariants in this case are defined by the semantics of the node itself (so as a user I couldn't add a invariant that `LogicalPlan::Join` had 3 inputs) It would perhaps make sense to provide a way to define invariants for [`UserDefinedLogicalNode`](https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.UserDefinedLogicalNode.html) and user provided implementations of [`ExecutionPlan`](https://docs.rs/datafusion/latest/datafusion/physical_plan/trait.ExecutionPlan.html) Defining an invarint check for `ExecutionPlan` I think would satisfy the usecase you mention above having specific rules for `ProgressiveEval` (a user defined extension node) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] [EPIC] Add support for all array expressions [datafusion-comet]
dharanad commented on issue #1042: URL: https://github.com/apache/datafusion-comet/issues/1042#issuecomment-2573945137 Many array functions in DataFusion currently have limited visibility. I have a pull request that addresses this issue https://github.com/apache/datafusion/pull/14030 We can support for a wider range of array functions when comet is updated it consume next DataFusion release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
kazuyukitanimura commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1904650489 ## native/core/src/execution/shuffle/shuffle_writer.rs: ## @@ -1567,17 +1585,41 @@ pub fn write_ipc_compressed( let mut timer = ipc_time.timer(); let start_pos = output.stream_position()?; -// write ipc_length placeholder -output.write_all(&[0u8; 8])?; +// seek past ipc_length placeholder +output.seek_relative(8)?; Review Comment: I think I got it now, thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
kazuyukitanimura commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1904652478 ## common/src/main/scala/org/apache/comet/CometConf.scala: ## @@ -272,18 +272,19 @@ object CometConf extends ShimCometConf { .booleanConf .createWithDefault(false) - val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = conf( -s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec") -.doc( - "The codec of Comet native shuffle used to compress shuffle data. Only zstd is supported. " + -"Compression can be disabled by setting spark.shuffle.compress=false.") -.stringConf -.checkValues(Set("zstd")) -.createWithDefault("zstd") + val COMET_EXEC_SHUFFLE_COMPRESSION_CODEC: ConfigEntry[String] = +conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.compression.codec") + .doc( +"The codec of Comet native shuffle used to compress shuffle data. lz4, zstd, and " + + "snappy are supported. Compression can be disabled by setting " + + "spark.shuffle.compress=false.") + .stringConf + .checkValues(Set("zstd", "lz4", "snappy")) + .createWithDefault("lz4") val COMET_EXEC_SHUFFLE_COMPRESSION_LEVEL: ConfigEntry[Int] = Review Comment: nit since the config name now has `zstd`, the constant name should ideally reflect it, but optional -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Minor: Improve zero partition check when inserting into `MemTable` [datafusion]
alamb commented on PR #14024: URL: https://github.com/apache/datafusion/pull/14024#issuecomment-2573980366 Thanks @jonahgao and @comphead ❤️ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Minor: Improve zero partition check when inserting into `MemTable` [datafusion]
alamb merged PR #14024: URL: https://github.com/apache/datafusion/pull/14024 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: reads using global ctx [datafusion-python]
kylebarron commented on code in PR #982: URL: https://github.com/apache/datafusion-python/pull/982#discussion_r1904778623 ## python/datafusion/io.py: ## @@ -0,0 +1,181 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""IO read functions using global context.""" + +import pathlib + +from datafusion.dataframe import DataFrame +from datafusion.expr import Expr +import pyarrow Review Comment: As the SO answer explains, import sorting isn't currently part of the default `ruff-format` behavior. We'd need to opt-in by adding an `I` element here: https://github.com/apache/datafusion-python/blob/79c22d6d6c0809e7e93a0a23249baa516dbd8d6f/pyproject.toml#L66 ## python/datafusion/io.py: ## @@ -0,0 +1,181 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""IO read functions using global context.""" + +import pathlib + +from datafusion.dataframe import DataFrame +from datafusion.expr import Expr +import pyarrow Review Comment: As the [SO answer above explains](https://stackoverflow.com/a/77876298), import sorting isn't currently part of the default `ruff-format` behavior. We'd need to opt-in by adding an `I` element here: https://github.com/apache/datafusion-python/blob/79c22d6d6c0809e7e93a0a23249baa516dbd8d6f/pyproject.toml#L66 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Add support for lz4 compression in shuffle [datafusion-comet]
andygrove closed issue #1178: Add support for lz4 compression in shuffle URL: https://github.com/apache/datafusion-comet/issues/1178 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
andygrove merged PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: reads using global ctx [datafusion-python]
kevinjqliu commented on code in PR #982: URL: https://github.com/apache/datafusion-python/pull/982#discussion_r1904776793 ## python/datafusion/io.py: ## @@ -0,0 +1,181 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""IO read functions using global context.""" + +import pathlib + +from datafusion.dataframe import DataFrame +from datafusion.expr import Expr +import pyarrow Review Comment: there a pre-commit config for ruff linter and formatter https://github.com/apache/datafusion-python/blob/79c22d6d6c0809e7e93a0a23249baa516dbd8d6f/.pre-commit-config.yaml#L23-L30 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Default to ZSTD compression when writing Parquet [datafusion-python]
kosiew commented on code in PR #981: URL: https://github.com/apache/datafusion-python/pull/981#discussion_r1904789223 ## python/datafusion/dataframe.py: ## @@ -620,16 +620,24 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, -compression: str = "uncompressed", +compression: str = "ZSTD", compression_level: int | None = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. Args: -path: Path of the Parquet file to write. -compression: Compression type to use. -compression_level: Compression level to use. -""" +path (str | pathlib.Path): The file path to write the Parquet file. +compression (str): The compression algorithm to use. Default is "ZSTD". +compression_level (int | None): The compression level to use. For ZSTD, the +recommended range is 1 to 22, with the default being 3. Higher levels +provide better compression but slower speed. +""" +# default compression level to 3 for ZSTD +if compression == "ZSTD": +if compression_level is None: +compression_level = 3 Review Comment: Thanks. I have amended the default to 4 ## python/datafusion/dataframe.py: ## @@ -620,16 +620,24 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, -compression: str = "uncompressed", +compression: str = "ZSTD", compression_level: int | None = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. Args: -path: Path of the Parquet file to write. -compression: Compression type to use. -compression_level: Compression level to use. -""" +path (str | pathlib.Path): The file path to write the Parquet file. +compression (str): The compression algorithm to use. Default is "ZSTD". +compression_level (int | None): The compression level to use. For ZSTD, the +recommended range is 1 to 22, with the default being 3. Higher levels +provide better compression but slower speed. +""" +# default compression level to 3 for ZSTD +if compression == "ZSTD": +if compression_level is None: +compression_level = 3 Review Comment: Thanks. I have amended the default to 4. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Default to ZSTD compression when writing Parquet [datafusion-python]
kosiew commented on code in PR #981: URL: https://github.com/apache/datafusion-python/pull/981#discussion_r1904789223 ## python/datafusion/dataframe.py: ## @@ -620,16 +620,24 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, -compression: str = "uncompressed", +compression: str = "ZSTD", compression_level: int | None = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. Args: -path: Path of the Parquet file to write. -compression: Compression type to use. -compression_level: Compression level to use. -""" +path (str | pathlib.Path): The file path to write the Parquet file. +compression (str): The compression algorithm to use. Default is "ZSTD". +compression_level (int | None): The compression level to use. For ZSTD, the +recommended range is 1 to 22, with the default being 3. Higher levels +provide better compression but slower speed. +""" +# default compression level to 3 for ZSTD +if compression == "ZSTD": +if compression_level is None: +compression_level = 3 Review Comment: Thanks. I have [amended the default to 4](https://github.com/apache/datafusion-python/pull/981/commits/819de0d41e633b5c725b4cdfd7f2cbf49cc3db7d). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
andygrove commented on PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#issuecomment-2573691492 @viirya @kazuyukitanimura @mbutrovich @comphead Thanks for the reviews so far. I believe I have addressed all feedback now. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Support pruning on string columns using LIKE [datafusion]
adriangb commented on issue #507: URL: https://github.com/apache/datafusion/issues/507#issuecomment-2573702237 I think we also need follow up tickets for: - NOT LIKE - Case insensitive matching -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
andygrove commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1904556240 ## native/core/src/execution/shuffle/shuffle_writer.rs: ## @@ -1567,17 +1585,41 @@ pub fn write_ipc_compressed( let mut timer = ipc_time.timer(); let start_pos = output.stream_position()?; -// write ipc_length placeholder -output.write_all(&[0u8; 8])?; +// seek past ipc_length placeholder +output.seek_relative(8)?; Review Comment: `seek_relative` takes an `i64` argument so can seek backwards or forwards, but we are seeking forwards (+8). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Automatically check "invariants" [datafusion]
alamb commented on issue #13652: URL: https://github.com/apache/datafusion/issues/13652#issuecomment-2573635690 I suggest we use this ticket to track the infrastructure for checking invariants (e.g. what @wiedld is doing in https://github.com/apache/datafusion/pull/13986) and then claim success. We can introduce additional invariant checks as we discover them -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] sql result discrepency with sqlite, postgres and duckdb [datafusion]
Omega359 commented on issue #13780: URL: https://github.com/apache/datafusion/issues/13780#issuecomment-2573650403 Addendum: Since the sqlite tests come from sqlite (duh) where REAL is mapped to 8 bytes (Double/f64) I would like to propose that I update the sqlite .slt files and change: 'AS REAL' -> 'AS DOUBLE' This would better match the actual types being tested and would fix many of the failing results. Along with correcting the nullif type behavior would fix almost all the remaining tests that have result mismatches with sqlite/postgresql. Thoughts? @alamb, @aweltsch, @2010YOUY01, @jayzhan-synnada -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Automatically check "invariants" [datafusion]
wiedld commented on issue #13652: URL: https://github.com/apache/datafusion/issues/13652#issuecomment-2573659546 > I suggest we use this ticket to track the infrastructure for checking invariants Agreed. Modifying [this list above](https://github.com/apache/datafusion/issues/13652#issuecomment-2550443777), we have infrastructure components of: - [x] Define infrastructure to check LP invariant. PR: [Introduce LogicalPlan invariants, begin automatically checking them #13651](https://github.com/apache/datafusion/pull/13651) - [ ] Define infrastructure to check physical plan invariants: [WIP: Proposed interface for physical plan invariant checking. #13986](https://github.com/apache/datafusion/pull/13986) - [ ] Define infrastructure for user-defined invariants. See issue: TODO -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
kazuyukitanimura commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1904549692 ## native/core/src/execution/shuffle/shuffle_writer.rs: ## @@ -1567,17 +1585,41 @@ pub fn write_ipc_compressed( let mut timer = ipc_time.timer(); let start_pos = output.stream_position()?; -// write ipc_length placeholder -output.write_all(&[0u8; 8])?; +// seek past ipc_length placeholder +output.seek_relative(8)?; Review Comment: @andygrove Actually IIUC, seek_relative is seeking forward, correct? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
andygrove commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1904551983 ## spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.execution.shuffle + +import java.io.{EOFException, InputStream} +import java.nio.{ByteBuffer, ByteOrder} +import java.nio.channels.{Channels, ReadableByteChannel} + +import org.apache.spark.TaskContext +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.comet.Native +import org.apache.comet.vector.NativeUtil + +/** + * This iterator wraps a Spark input stream that is reading shuffle blocks generated by the Comet + * native ShuffleWriterExec and then calls native code to decompress and decode the shuffle blocks + * and use Arrow FFI to return the Arrow record batch. + */ +case class NativeBatchDecoderIterator( +var in: InputStream, +taskContext: TaskContext, +decodeTime: SQLMetric) +extends Iterator[ColumnarBatch] { + + private var isClosed = false + private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN) + private val native = new Native() + private val nativeUtil = new NativeUtil() + private var currentBatch: ColumnarBatch = null + private var batch = fetchNext() + + import NativeBatchDecoderIterator.threadLocalDataBuf + + if (taskContext != null) { +taskContext.addTaskCompletionListener[Unit](_ => { + close() +}) + } + + private val channel: ReadableByteChannel = if (in != null) { +Channels.newChannel(in) + } else { +null + } + + def hasNext(): Boolean = { +if (channel == null || isClosed) { + return false +} +if (batch.isDefined) { + return true +} + +// Release the previous batch. +if (currentBatch != null) { + currentBatch.close() + currentBatch = null +} + +batch = fetchNext() +if (batch.isEmpty) { + close() + return false +} +true + } + + def next(): ColumnarBatch = { +if (!hasNext) { + throw new NoSuchElementException +} + +val nextBatch = batch.get + +currentBatch = nextBatch +batch = None +currentBatch + } + + private def fetchNext(): Option[ColumnarBatch] = { +if (channel == null || isClosed) { + return None +} + +// read compressed batch size from header +try { + longBuf.clear() + while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} +} catch { + case _: EOFException => +close() +return None +} + +// If we reach the end of the stream, we are done, or if we read partial length +// then the stream is corrupted. +if (longBuf.hasRemaining) { + if (longBuf.position() == 0) { +close() +return None + } + throw new EOFException("Data corrupt: unexpected EOF while reading compressed ipc lengths") +} + +// get compressed length (including headers) +longBuf.flip() +val compressedLength = longBuf.getLong.toInt + +// read field count from header +longBuf.clear() +while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} +if (longBuf.hasRemaining) { + throw new EOFException("Data corrupt: unexpected EOF while reading field count") +} +longBuf.flip() +val fieldCount = longBuf.getLong.toInt + +// read body +val bytesToRead = compressedLength - 8 +var dataBuf = threadLocalDataBuf.get() +if (dataBuf.capacity() < bytesToRead) { + // it is unlikely that we would overflow here since it would + // require a 1GB compressed shuffle block but we check anyway + val newCapacity = (bytesToRead * 2L).min(Integer.MAX_VALUE).toInt Review Comment: `bytesToRead` is a positive 32-bit integer, though, and `Int.MaxValue * 2L` cannot exceed `Long.MaxValue`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@data
Re: [PR] feat: Move shuffle block decompression and decoding to native code and add LZ4 & Snappy support [datafusion-comet]
andygrove commented on code in PR #1192: URL: https://github.com/apache/datafusion-comet/pull/1192#discussion_r1904587549 ## spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/NativeBatchDecoderIterator.scala: ## @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.spark.sql.comet.execution.shuffle + +import java.io.{EOFException, InputStream} +import java.nio.{ByteBuffer, ByteOrder} +import java.nio.channels.{Channels, ReadableByteChannel} + +import org.apache.spark.TaskContext +import org.apache.spark.sql.execution.metric.SQLMetric +import org.apache.spark.sql.vectorized.ColumnarBatch + +import org.apache.comet.Native +import org.apache.comet.vector.NativeUtil + +/** + * This iterator wraps a Spark input stream that is reading shuffle blocks generated by the Comet + * native ShuffleWriterExec and then calls native code to decompress and decode the shuffle blocks + * and use Arrow FFI to return the Arrow record batch. + */ +case class NativeBatchDecoderIterator( +var in: InputStream, +taskContext: TaskContext, +decodeTime: SQLMetric) +extends Iterator[ColumnarBatch] { + + private var isClosed = false + private val longBuf = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN) + private val native = new Native() + private val nativeUtil = new NativeUtil() + private var currentBatch: ColumnarBatch = null + private var batch = fetchNext() + + import NativeBatchDecoderIterator.threadLocalDataBuf + + if (taskContext != null) { +taskContext.addTaskCompletionListener[Unit](_ => { + close() +}) + } + + private val channel: ReadableByteChannel = if (in != null) { +Channels.newChannel(in) + } else { +null + } + + def hasNext(): Boolean = { +if (channel == null || isClosed) { + return false +} +if (batch.isDefined) { + return true +} + +// Release the previous batch. +if (currentBatch != null) { + currentBatch.close() + currentBatch = null +} + +batch = fetchNext() +if (batch.isEmpty) { + close() + return false +} +true + } + + def next(): ColumnarBatch = { +if (!hasNext) { + throw new NoSuchElementException +} + +val nextBatch = batch.get + +currentBatch = nextBatch +batch = None +currentBatch + } + + private def fetchNext(): Option[ColumnarBatch] = { +if (channel == null || isClosed) { + return None +} + +// read compressed batch size from header +try { + longBuf.clear() + while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} +} catch { + case _: EOFException => +close() +return None +} + +// If we reach the end of the stream, we are done, or if we read partial length +// then the stream is corrupted. +if (longBuf.hasRemaining) { + if (longBuf.position() == 0) { +close() +return None + } + throw new EOFException("Data corrupt: unexpected EOF while reading compressed ipc lengths") +} + +// get compressed length (including headers) +longBuf.flip() +val compressedLength = longBuf.getLong.toInt + +// read field count from header +longBuf.clear() +while (longBuf.hasRemaining && channel.read(longBuf) >= 0) {} +if (longBuf.hasRemaining) { + throw new EOFException("Data corrupt: unexpected EOF while reading field count") +} +longBuf.flip() +val fieldCount = longBuf.getLong.toInt + +// read body +val bytesToRead = compressedLength - 8 +var dataBuf = threadLocalDataBuf.get() +if (dataBuf.capacity() < bytesToRead) { + // it is unlikely that we would overflow here since it would + // require a 1GB compressed shuffle block but we check anyway + val newCapacity = (bytesToRead * 2L).min(Integer.MAX_VALUE).toInt Review Comment: I've now implemented a hard limit of 2GB shuffle block size both in the writer and reader -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
[PR] Unparsing optimized (> 2 inputs) unions [datafusion]
MohamedAbdeen21 opened a new pull request, #14031: URL: https://github.com/apache/datafusion/pull/14031 ## Which issue does this PR close? Closes #13621. ## Rationale for this change Unparsing unions with more than 2 inputs (produced by the logical optimizer) ## What changes are included in this PR? Tests and required changes to unparser ## Are these changes tested? Yes ## Are there any user-facing changes? Should be able to unparse most unions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Unparsing optimized (> 2 inputs) unions [datafusion]
MohamedAbdeen21 commented on PR #14031: URL: https://github.com/apache/datafusion/pull/14031#issuecomment-2574047801 Looks like there's a circular dep between optimizer and SQL packages. The easiest solution is moving the test somewhere else, not sure where though -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Default to ZSTD compression when writing Parquet [datafusion-python]
kylebarron commented on code in PR #981: URL: https://github.com/apache/datafusion-python/pull/981#discussion_r1904715954 ## python/datafusion/dataframe.py: ## @@ -620,16 +620,24 @@ def write_csv(self, path: str | pathlib.Path, with_header: bool = False) -> None def write_parquet( self, path: str | pathlib.Path, -compression: str = "uncompressed", +compression: str = "ZSTD", compression_level: int | None = None, ) -> None: """Execute the :py:class:`DataFrame` and write the results to a Parquet file. Args: -path: Path of the Parquet file to write. -compression: Compression type to use. -compression_level: Compression level to use. -""" +path (str | pathlib.Path): The file path to write the Parquet file. +compression (str): The compression algorithm to use. Default is "ZSTD". +compression_level (int | None): The compression level to use. For ZSTD, the +recommended range is 1 to 22, with the default being 3. Higher levels +provide better compression but slower speed. +""" +# default compression level to 3 for ZSTD +if compression == "ZSTD": +if compression_level is None: +compression_level = 3 Review Comment: Sure, that sounds good to me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: reads using global ctx [datafusion-python]
kylebarron commented on code in PR #982: URL: https://github.com/apache/datafusion-python/pull/982#discussion_r1904717182 ## python/datafusion/io.py: ## @@ -0,0 +1,181 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +"""IO read functions using global context.""" + +import pathlib + +from datafusion.dataframe import DataFrame +from datafusion.expr import Expr +import pyarrow Review Comment: Side note: it would be great to use ruff (https://stackoverflow.com/a/77876298) or isort to deterministically and programmatically sort python imports, and validate that in CI. I think isort/ruff would have a newline here between the third-party and first-party imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] supports_filters_pushdown is invoked more than once on a single Custom Data Source [datafusion]
cisaacson commented on issue #13994: URL: https://github.com/apache/datafusion/issues/13994#issuecomment-2574327502 Thanks @jonahgao , this is very helpful. The documentation does not fully reflect this, I will try and update it. The way I have things now I am not dependent on the DataFusion `filters` but I want it to know what my custom data source can do so that it does not do the work itself. I see now if it knew that a `filter` was labeled `Exact` and later we return all `Unsupported` from subsequent calls, the first `Exact` should be respected. But then the `filter` may not be pushed down as the DataFusion plan does not require it. I will still return the right data regardless the way I have it now. This did require an external (to DataFusion) implementation for my custom data source. Unless there is an important reason to keep a non-mutable `&self` in the `supports_filters_pushdown` I would recommend change it to `&mut self`. This should be OK as you can only modify the custom data source struct which the implementor owns. Do you agree? If so I will file an enhancement issue. @alamb you have been watching this issue I believe, let me know if you have an opinion on making mutable references for some of these functions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] supports_filters_pushdown is invoked more than once on a single Custom Data Source [datafusion]
cisaacson commented on issue #13994: URL: https://github.com/apache/datafusion/issues/13994#issuecomment-2573291404 @jonahgao Thanks for explaining this. We can probably work with this but the issue is that since we want some `filters` and not others (in other words some are preferred indexes) we need to keep state between `supports_filters_pushdown` calls. The trait for this fn is `&self` and not `&mut self` so the only way for us to track it is with something like `OnceLock` to maintain the state. I asked about this a while back, if `&mut self` was passed to this fn it would be much easier to control the functionality. One last question: If I have a query with `[filterA,filterB]` and on the initial call I return `[Unsupported,Exact]` and on a subsequent call you send me `[filterA]` and I return `[Unsupported]`, then I believe the `scan` `filters` comes through as `[]` (empty). Is DataFusion still respecting the `Exact` on `filterB`? My guess is not, that is another important problem as if my custom data source only sends back data that matches `filterB` and DataFusion is again filtering all result data by `filterB` again the pushdown is not doing anything. I see this now, it does work because my data source did filter the data properly, but my guess is DataFusion is doing extra work it should not do. For now your explanation gives me a workaround (given the inefficiency I just explained, not a deal breaker but it could be improved). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Memory account not adding up in SortExec [datafusion]
westonpace commented on issue #10073: URL: https://github.com/apache/datafusion/issues/10073#issuecomment-2573298788 > FWIW I'm still seeing the same issue through LanceDB (https://github.com/lancedb/lance/issues/2119#issuecomment-2136414811). This isn't necessarily indicative as Lance lags behind Datafusion (currently we are at 42 which is 4 months behind). However, I just updated my local lance to release 44 (which should contain the potential fix @alamb is alluding to) and confirmed that the issue is still not fixed. This also doesn't surprise me. I think the issue here is not double-counting but rather is dealing with the fact that a string array uses more memory after sorting than it was using before sorting (and so we run out of memory trying to spill). I'll try and find some time today to create a pure datafusion reproducer. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Add support for the SQL OVERLAPS predicate [datafusion-sqlparser-rs]
iffyio merged PR #1638: URL: https://github.com/apache/datafusion-sqlparser-rs/pull/1638 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Support pluralized time units [datafusion-sqlparser-rs]
iffyio commented on code in PR #1630: URL: https://github.com/apache/datafusion-sqlparser-rs/pull/1630#discussion_r1904299807 ## src/parser/mod.rs: ## @@ -2353,14 +2355,30 @@ impl<'a> Parser<'a> { }; Ok(DateTimeField::Week(week_day)) } +Keyword::WEEKS => { +let week_day = if dialect_of!(self is BigQueryDialect) +&& self.consume_token(&Token::LParen) +{ +let week_day = self.parse_identifier()?; +self.expect_token(&Token::RParen)?; +Some(week_day) +} else { +None +}; +Ok(DateTimeField::Weeks(week_day)) +} Review Comment: @wugeer I think the code still references BigQuery, realized the tests don't seem to cover this scenario, but actually I'm also wondering which dialect supports these pluralized time units (do you have a link to the docs)? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] Add support for Snowflake LIST and REMOVE [datafusion-sqlparser-rs]
iffyio merged PR #1639: URL: https://github.com/apache/datafusion-sqlparser-rs/pull/1639 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] sql odd case of rounding compared to duckdb and postgresql [datafusion]
Omega359 commented on issue #13781: URL: https://github.com/apache/datafusion/issues/13781#issuecomment-2573381963 I suspect much of this is the same cause as #13780 - nullif typing being incorrect and real mapping to f32 where it is not possible to represent some integers exactly. Postgresql also seems to have an interesting casting behavior that if you case an int to a real it may ignore it and implicitly cast it back to an int as long as there is no dependency requiring it to be a real (division for example) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] feat: add test to check for `ctx.enable_url_table()` [datafusion-ballista]
andygrove merged PR #1155: URL: https://github.com/apache/datafusion-ballista/pull/1155 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [PR] chore: no need to run python test in rust [datafusion-ballista]
andygrove merged PR #1154: URL: https://github.com/apache/datafusion-ballista/pull/1154 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Memory account not adding up in SortExec [datafusion]
westonpace commented on issue #10073: URL: https://github.com/apache/datafusion/issues/10073#issuecomment-2573542512 Here's a pure-rust datafusion-only example: https://github.com/westonpace/arrow-datafusion/commit/26ed75c51ad649a274063ad3fa1262b7025a17cf It takes a bit of time the first run to generate the strings test file (it probably doesn't need to be so big). After that it reproduces the issue quickly. I've also added some prints that hopefully highlight the issue. Before we do an in-memory sort we have ~5MB of unsorted string data. After sorting we have 8MB of sorted string data. This is not surprising to me. During the sort we are probably building a string array and probably using some kind of resize-on-append string building that is doubling and we end up with ~8MB because the amount we need is between 4MB and 8MB. Unfortunately, this leads to a failure which is probably should not do. I think @alamb had some good suggestions [in this comment](https://github.com/apache/datafusion/issues/10073#issuecomment-2056571501) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Support pruning on string columns using LIKE [datafusion]
alamb commented on issue #507: URL: https://github.com/apache/datafusion/issues/507#issuecomment-2573539614 Filed the following ticket to support `starts_with`: 🎣 - https://github.com/apache/datafusion/issues/14027 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[PR] feat(optimizer): Enable filter pushdown on window functions [datafusion]
nuno-faria opened a new pull request, #14026: URL: https://github.com/apache/datafusion/pull/14026 Ensures selections can be pushed past window functions, similarly to what is already done with aggregations, when possible. Unlike aggregations, however, extra care must be taken when handling multiple functions in the same Window operator. ## Which issue does this PR close? N/A. ## Rationale for this change Pushing filters past window functions allows for a more efficient execution, as data are filtered closer to the root. Optimizers such as the one used by Postgres already support this optimization. Example: ```sql CREATE TABLE t (k int, v int); -- filter is pushed past the window function EXPLAIN SELECT * FROM ( SELECT *, rank() OVER(PARTITION BY k) FROM t ) WHERE k = 1; -- filter column is used by the partition key QUERY PLAN - WindowAgg -> Seq Scan on t Filter: (k = 1) -- filter is not pushed past the window function (not used in the partitioning) EXPLAIN SELECT * FROM ( SELECT *, rank() OVER(PARTITION BY k) FROM t ) WHERE v = 1; QUERY PLAN - Subquery Scan on unnamed_subquery Filter: (unnamed_subquery.v = 1) -> WindowAgg -> Sort Sort Key: t.k -> Seq Scan on t ``` ## What changes are included in this PR? - Added a new match arm handling `LogicalPlan::Window` in `optimizer/push_down_filters.rs`. - Added new tests to `optimizer/push_down_filters.rs`. - Added new tests to `sqllogictest/window.slt`. ## Are these changes tested? Yes. ## Are there any user-facing changes? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[I] Support pruning on `starts_with` [datafusion]
alamb opened a new issue, #14027: URL: https://github.com/apache/datafusion/issues/14027 ### Is your feature request related to a problem or challenge? @adriangb implemented `PruningPredicate` support for prefix matching `LIKE` / `NOT LIKE` in - https://github.com/apache/datafusion/pull/12978 However, it isn't currently supported for the `starts_with` function ### Describe the solution you'd like I would like predicate pruning to happen for the `starts_with` function as well So queries like ```sql select * from my_file where starts_with(col, 'http://') ``` Could also use starts_with to prune parquet files ### Describe alternatives you've considered The challenge at the moment is that `PruningPredicate` can't refer directly to the function implementations Given how optimized LIKE is one possible solution would be to change `starts_with` so it didn't just call an arrow kernel, but instead was rewritten https://github.com/apache/datafusion/blob/main/datafusion/functions/src/string/starts_with.rs So for example, it could be rewritten into [`Expr::Like`](https://docs.rs/datafusion/latest/datafusion/prelude/enum.Expr.html#variant.Like) by implementing `simplity`: https://docs.rs/datafusion/latest/datafusion/logical_expr/trait.ScalarUDFImpl.html#method.simplify We could do something similar with `ends_with` as well ### Additional context _No response_ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Support pruning on `starts_with` [datafusion]
alamb commented on issue #14027: URL: https://github.com/apache/datafusion/issues/14027#issuecomment-2573538810 I think this is a good first issue as rewriting a function should be straightforward and doesn't require indepth knowledge of the rest of the engine -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Ballista 43.0.0 Release [datafusion-ballista]
andygrove commented on issue #974: URL: https://github.com/apache/datafusion-ballista/issues/974#issuecomment-2573567119 Sure, lets do it. Can you create a PR against `main` to update version numbers and add the changelog? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
[I] fail to parse `set ez.grouping.max-size=1234;` in Hive Dialect [datafusion-sqlparser-rs]
wugeer opened a new issue, #1643: URL: https://github.com/apache/datafusion-sqlparser-rs/issues/1643 According to the Apache Tez code, parameter `tez.grouping.max-size` is supported. https://github.com/apache/tez/blob/1e6c9e3448bb9d934508ee995ad60c23dafa0610/tez-mapreduce/src/main/java/org/apache/tez/mapreduce/grouper/TezSplitGrouper.java#L72 Also when hive uses tez engine, it is possible to set the `tez.grouping.max-size` parameter, but the current version fail to parse it.  -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org
Re: [I] Panic in a query with NATURAL JOIN (SQLancer) [datafusion]
alamb commented on issue #14015: URL: https://github.com/apache/datafusion/issues/14015#issuecomment-2573011842 THanks @2010YOUY01 and @jonahgao -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org