[PR] [SPARK-50325][SQL] Factor out alias resolution to be reused in the single-pass Analyzer [spark]
vladimirg-db opened a new pull request, #48857: URL: https://github.com/apache/spark/pull/48857 ### What changes were proposed in this pull request? Factor out alias resolution code to the `AliasResolution` object. ### Why are the changes needed? Some Analyzer code will be used in both fixed-point and single-pass Analyzers. Also, Analyzer.scala is 4K+ lines long, so it makes sense to gradually split it. Context: https://issues.apache.org/jira/browse/SPARK-49834 ### Does this PR introduce _any_ user-facing change? No. It's a pure refactoring. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50313][SQL][TESTS] Enable ANSI in SQL *SQLQueryTestSuite by default [spark]
yaooqinn commented on code in PR #48842: URL: https://github.com/apache/spark/pull/48842#discussion_r1843367043 ## sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql: ## @@ -88,7 +88,7 @@ SELECT CAST(10 AS DECIMAL(10, 2)) div CAST(3 AS DECIMAL(5, 1)); set spark.sql.decimalOperations.allowPrecisionLoss=false; -- test decimal operations -select id, a+b, a-b, a*b, a/b from decimals_test order by id; +select /*+ COALESCE(1) */ id, a+b, a-b, a*b, a/b from decimals_test order by id; Review Comment: stabilize error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50313][SQL][TESTS] Enable ANSI in SQL *SQLQueryTestSuite by default [spark]
yaooqinn commented on code in PR #48842: URL: https://github.com/apache/spark/pull/48842#discussion_r1843366680 ## sql/core/src/test/resources/sql-tests/inputs/udf/udf-union.sql: ## @@ -11,7 +11,7 @@ FROM (SELECT udf(c1) as c1, udf(c2) as c2 FROM t1 -- Type Coerced Union SELECT udf(c1) as c1, udf(c2) as c2 -FROM (SELECT udf(c1) as c1, udf(c2) as c2 FROM t1 +FROM (SELECT udf(c1) as c1, udf(c2) as c2 FROM t1 WHERE c2 = 'a' Review Comment: stabilize error ## sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql: ## @@ -548,7 +548,8 @@ values (0, 'abc') tab(x, y) -- Union distinct with a VALUES list. values (0, 1) tab(x, y) -|> union table t; +|> union table t +|> where x = 0; Review Comment: stabilize error -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]
vladimirg-db opened a new pull request, #48858: URL: https://github.com/apache/spark/pull/48858 ### What changes were proposed in this pull request? Factor out function resolution code to the `FunctionResolution` object. ### Why are the changes needed? Some Analyzer code will be used in both fixed-point and single-pass Analyzers. Also, Analyzer.scala is 4K+ lines long, so it makes sense to gradually split it. Context: https://issues.apache.org/jira/browse/SPARK-49834 ### Does this PR introduce _any_ user-facing change? No. It's a pure refactoring. ### How was this patch tested? Existing tests. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50309][SQL] Add documentation for SQL pipe syntax [spark]
cloud-fan commented on code in PR #48852: URL: https://github.com/apache/spark/pull/48852#discussion_r1843507956 ## docs/sql-pipe-syntax.md: ## @@ -0,0 +1,540 @@ +--- +layout: global +title: SQL Pipe Syntax +displayTitle: SQL Pipe Syntax +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Syntax + + Overview + +Apache Spark supports SQL pipe syntax which allows composing queries from combinations of operators. + +* Any query can have zero or more pipe operators as a suffix, delineated by the pipe character `|>`. +* Each pipe operator starts with one or more SQL keywords followed by its own grammar as described + in the table below. +* Most of these operators reuse existing grammar for standard SQL clauses. +* Operators can apply in any order, any number of times. + +`FROM ` is now a supported standalone query which behaves the same as +`TABLE `. This provides a convenient starting place to begin a chained pipe SQL query, +although it is possible to add one or more pipe operators to the end of any valid Spark SQL query +with the same consistent behavior as written here. + +Please refer to the table at the end of this document for a full list of all supported operators +and their semantics. + + Example + +For example, this is query 13 from the TPC-H benchmark: + +```sql +SELECT c_count, COUNT(*) AS custdist +FROM + (SELECT c_custkey, COUNT(o_orderkey) c_count + FROM customer + LEFT OUTER JOIN orders ON c_custkey = o_custkey + AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey + ) AS c_orders +GROUP BY c_count +ORDER BY custdist DESC, c_count DESC; +``` + +To write the same logic using SQL pipe operators, we express it like this: + +```sql +FROM customer +|> LEFT OUTER JOIN orders ON c_custkey = o_custkey + AND o_comment NOT LIKE '%unusual%packages%' +|> AGGREGATE COUNT(o_orderkey) c_count + GROUP BY c_custkey +|> AGGREGATE COUNT(*) AS custdist + GROUP BY c_count +|> ORDER BY custdist DESC, c_count DESC; +``` + + Source Tables + +To start a new query using SQL pipe syntax, use the `FROM ` or `TABLE ` +clause, which creates a relation comprising all rows from the source table. Then append one or more +pipe operators to the end of this clause to perform further transformations. + + Projections + +SQL pipe syntax supports composable ways to evaluate expressions. A major advantage of these +projection features is that they support computing new expressions based on previous ones in an +incremental way. No lateral column references are needed here since each operator applies +independently on its input table, regardless of the order in which the operators appear. Each of +these computed columns then becomes visible to use with the following operator. + +`SELECT` produces a new table by evaluating the provided expressions. +It is possible to use `DISTINCT` and `*` as needed. +This works like the outermost `SELECT` in a table subquery in regular Spark SQL. + +`EXTEND` adds new columns to the input table by evaluating the provided expressions. +This also preserves table aliases. +This works like `SELECT *, new_column` in regular Spark SQL. + +`DROP` removes columns from the input table. +This is similar to `SELECT * EXCEPT (column)` in regular Spark SQL. + +`SET` replaces column values from the input table. +This is similar to `SELECT * REPLACE (expression AS column)` in regular Spark SQL. + +`AS` forwards the input table and introduces a new alias for each row. + + Aggregations + +In general, aggregation takes place differently using SQL pipe syntax as opposed to regular Spark +SQL. + +To perform full-table aggregation, use the `AGGREGATE` operator with a list of aggregate +expressions to evaluate. This returns one single row in the output table. + +To perform aggregation with grouping, use the `AGGREGATE` operator with a `GROUP BY` clause. +This returns one row for each unique combination of values of the grouping expressions. The output +table contains the evaluated grouping expressions followed by the evaluated aggregate functions. +Grouping expressions support assigning aliases for purposes of referring to them in future +operators. In this way, it is not necessary to re
Re: [PR] [SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan [spark]
yaooqinn commented on PR #48037: URL: https://github.com/apache/spark/pull/48037#issuecomment-2478403269 It looks like the number of suppressed can be calculated by `2 * calls of Utils.doTryWithCallerStacktrace` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50313][SQL][TESTS] Enable ANSI in SQL *SQLQueryTestSuite by default [spark]
yaooqinn commented on PR #48842: URL: https://github.com/apache/spark/pull/48842#issuecomment-2478422326 cc @dongjoon-hyun as the initiator of SPARK-4, also cc @cloud-fan, thank you -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
cloud-fan commented on code in PR #48847: URL: https://github.com/apache/spark/pull/48847#discussion_r1843517881 ## sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala: ## @@ -741,4 +741,21 @@ class ParametersSuite extends QueryTest with SharedSparkSession with PlanTest { Row("c1")) } } + + test("SPARK-50322: parameterized identifier in a sub-query") { +withTable("tt1") { + sql("create table tt1(c1 int)") + sql("insert into tt1 values (1)") + def query(p: String): String = { +s""" + |with v1 as ( + | select * from tt1 + | where 1 = (Select * from identifier($p)) Review Comment: ```suggestion | where 1 = (select * from identifier($p)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
MaxGekk commented on code in PR #48847: URL: https://github.com/apache/spark/pull/48847#discussion_r1843594293 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala: ## @@ -189,7 +189,8 @@ object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase { // We should wait for `CTESubstitution` to resolve CTE before binding parameters, as CTE // relations are not children of `UnresolvedWith`. Review Comment: Can we do that in 2 steps: 1. fix the issue, 2. do the proposed refactoring? It seems less risky, so, the first patch could be backported more safely. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
cloud-fan commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843594523 ## sql/core/src/test/resources/sql-tests/results/listagg.sql.out: ## @@ -0,0 +1,436 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +b|c +c|d + + +-- !query +SELECT listagg(a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query schema +struct +-- !query output +NULL +a +b +ba +ba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +b|a|b|a + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭB�� + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query schema +struct +-- !query output +123true,false,false + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", Review Comment: what's the point of this test? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
cloud-fan commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843597510 ## sql/core/src/test/resources/sql-tests/results/listagg.sql.out: ## @@ -0,0 +1,436 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +b|c +c|d + + +-- !query +SELECT listagg(a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query schema +struct +-- !query output +NULL +a +b +ba +ba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +b|a|b|a + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭB�� + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query schema +struct +-- !query output +123true,false,false + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { +"error" : "','", +"hint" : "" + } +} + + +-- !query +SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + "sqlState" : "42K09", + "messageParameters" : { +"dataType" : "(\"BINARY\" or \"STRING\")", +"functionName" : "`listagg`", +"sqlExpr" : "\"listagg(c1, , )\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 24, +"fragment" : "listagg(c1, ', ')" + } ] +} + + +-- !query +SELECT listagg(b, a) FROM df GROUP BY a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { +"inputExpr" : "\"a\"", +"inputName" : "`delimiter`", +"inputType" : "\"STRING\"", +"sqlExpr" : "\"listagg(b, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 20, +"fragment" : "listagg(b, a)" + } ] +} + + +-- !query +SELECT listagg(a) OVER (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +NULL +aa +aa +aabb +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { +"aggFunc" : "\"listagg(a, NULL, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex"
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
cloud-fan commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843595865 ## sql/core/src/test/resources/sql-tests/results/listagg.sql.out: ## @@ -0,0 +1,436 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +b|c +c|d + + +-- !query +SELECT listagg(a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query schema +struct +-- !query output +NULL +a +b +ba +ba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +b|a|b|a + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭB�� + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query schema +struct +-- !query output +123true,false,false + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { +"error" : "','", +"hint" : "" + } +} + + +-- !query +SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + "sqlState" : "42K09", + "messageParameters" : { +"dataType" : "(\"BINARY\" or \"STRING\")", +"functionName" : "`listagg`", +"sqlExpr" : "\"listagg(c1, , )\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 24, +"fragment" : "listagg(c1, ', ')" + } ] +} + + +-- !query +SELECT listagg(b, a) FROM df GROUP BY a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { +"inputExpr" : "\"a\"", +"inputName" : "`delimiter`", +"inputType" : "\"STRING\"", +"sqlExpr" : "\"listagg(b, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 20, +"fragment" : "listagg(b, a)" + } ] +} + + +-- !query +SELECT listagg(a) OVER (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +NULL +aa +aa +aabb +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { +"aggFunc" : "\"listagg(a, NULL, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex"
Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]
stevomitric commented on code in PR #48501: URL: https://github.com/apache/spark/pull/48501#discussion_r1843598536 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,88 @@ -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -- -UTF8_BINARY 1353 1357 5 0.1 13532.2 1.0X -UTF8_LCASE 2601 2602 2 0.0 26008.0 1.9X -UNICODE 16745 16756 16 0.0 167450.9 12.4X -UNICODE_CI 16590 16627 52 0.0 165904.8 12.3X +UTF8_BINARY 2220 2223 5 0.0 22197.0 1.0X +UTF8_LCASE 4949 4950 2 0.0 49488.1 2.2X +UNICODE 28172 28198 36 0.0 281721.0 12.7X +UNICODE_CI 28233 28308 106 0.0 282328.2 12.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time --- -UTF8_BINARY 1746 1746 0 0.1 17462.6 1.0X -UTF8_LCASE2629 2630 1 0.0 26294.8 1.5X -UNICODE 16744 16744 0 0.0 167438.6 9.6X -UNICODE_CI 16518 16521 4 0.0 165180.2 9.5X +UTF8_BINARY 2731 2733 2 0.0 27313.6 1.0X +UTF8_LCASE4611 4619 11 0.0 46111.4 1.7X +UNICODE 28149 28211 88 0.0 281486.8 10.3X +UNICODE_CI 27535 27597 89 0.0 275348.4 10.1X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -UTF8_BINARY2808 2808 1 0.0 28076.2 1.0X -UTF8_LCASE 5409 5410 0 0.0 54093.0 1.9X -UNICODE 67930 67957 38 0.0 679296.7 24.2X -UNICODE_CI56004 56005 1 0.0 560044.2 19.9X +UTF8_BINARY4603 4618 22 0.0 46031.3 1.0X +UTF8_LCASE 9510 9518 11 0.0 95097.7 2.1X +UNICODE 135718 135786 97 0.0 1357176.2 29.5X +UNICODE_CI 113715 113819 148 0.0 1137145.8 24.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time
Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]
stevomitric commented on code in PR #48501: URL: https://github.com/apache/spark/pull/48501#discussion_r1843598536 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,88 @@ -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -- -UTF8_BINARY 1353 1357 5 0.1 13532.2 1.0X -UTF8_LCASE 2601 2602 2 0.0 26008.0 1.9X -UNICODE 16745 16756 16 0.0 167450.9 12.4X -UNICODE_CI 16590 16627 52 0.0 165904.8 12.3X +UTF8_BINARY 2220 2223 5 0.0 22197.0 1.0X +UTF8_LCASE 4949 4950 2 0.0 49488.1 2.2X +UNICODE 28172 28198 36 0.0 281721.0 12.7X +UNICODE_CI 28233 28308 106 0.0 282328.2 12.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time --- -UTF8_BINARY 1746 1746 0 0.1 17462.6 1.0X -UTF8_LCASE2629 2630 1 0.0 26294.8 1.5X -UNICODE 16744 16744 0 0.0 167438.6 9.6X -UNICODE_CI 16518 16521 4 0.0 165180.2 9.5X +UTF8_BINARY 2731 2733 2 0.0 27313.6 1.0X +UTF8_LCASE4611 4619 11 0.0 46111.4 1.7X +UNICODE 28149 28211 88 0.0 281486.8 10.3X +UNICODE_CI 27535 27597 89 0.0 275348.4 10.1X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -UTF8_BINARY2808 2808 1 0.0 28076.2 1.0X -UTF8_LCASE 5409 5410 0 0.0 54093.0 1.9X -UNICODE 67930 67957 38 0.0 679296.7 24.2X -UNICODE_CI56004 56005 1 0.0 560044.2 19.9X +UTF8_BINARY4603 4618 22 0.0 46031.3 1.0X +UTF8_LCASE 9510 9518 11 0.0 95097.7 2.1X +UNICODE 135718 135786 97 0.0 1357176.2 29.5X +UNICODE_CI 113715 113819 148 0.0 1137145.8 24.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time
Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]
stevomitric commented on code in PR #48501: URL: https://github.com/apache/spark/pull/48501#discussion_r1843598536 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,88 @@ -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -- -UTF8_BINARY 1353 1357 5 0.1 13532.2 1.0X -UTF8_LCASE 2601 2602 2 0.0 26008.0 1.9X -UNICODE 16745 16756 16 0.0 167450.9 12.4X -UNICODE_CI 16590 16627 52 0.0 165904.8 12.3X +UTF8_BINARY 2220 2223 5 0.0 22197.0 1.0X +UTF8_LCASE 4949 4950 2 0.0 49488.1 2.2X +UNICODE 28172 28198 36 0.0 281721.0 12.7X +UNICODE_CI 28233 28308 106 0.0 282328.2 12.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time --- -UTF8_BINARY 1746 1746 0 0.1 17462.6 1.0X -UTF8_LCASE2629 2630 1 0.0 26294.8 1.5X -UNICODE 16744 16744 0 0.0 167438.6 9.6X -UNICODE_CI 16518 16521 4 0.0 165180.2 9.5X +UTF8_BINARY 2731 2733 2 0.0 27313.6 1.0X +UTF8_LCASE4611 4619 11 0.0 46111.4 1.7X +UNICODE 28149 28211 88 0.0 281486.8 10.3X +UNICODE_CI 27535 27597 89 0.0 275348.4 10.1X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -UTF8_BINARY2808 2808 1 0.0 28076.2 1.0X -UTF8_LCASE 5409 5410 0 0.0 54093.0 1.9X -UNICODE 67930 67957 38 0.0 679296.7 24.2X -UNICODE_CI56004 56005 1 0.0 560044.2 19.9X +UTF8_BINARY4603 4618 22 0.0 46031.3 1.0X +UTF8_LCASE 9510 9518 11 0.0 95097.7 2.1X +UNICODE 135718 135786 97 0.0 1357176.2 29.5X +UNICODE_CI 113715 113819 148 0.0 1137145.8 24.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time
[PR] [SPARK-50328][INFRA] Add a separate docker file for SparkR [spark]
zhengruifeng opened a new pull request, #48859: URL: https://github.com/apache/spark/pull/48859 ### What changes were proposed in this pull request? Add a separate docker file for SparkR ### Why are the changes needed? For env isolation ### Does this PR introduce _any_ user-facing change? No, infra-only ### How was this patch tested? CI ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
cloud-fan commented on code in PR #48847: URL: https://github.com/apache/spark/pull/48847#discussion_r1843600908 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala: ## @@ -189,7 +189,8 @@ object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase { // We should wait for `CTESubstitution` to resolve CTE before binding parameters, as CTE // relations are not children of `UnresolvedWith`. Review Comment: +1 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy [spark]
cloud-fan commented on code in PR #48829: URL: https://github.com/apache/spark/pull/48829#discussion_r1843600376 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -95,13 +95,12 @@ private[sql] object Dataset { def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = sparkSession.withActive { val qe = sparkSession.sessionState.executePlan(logicalPlan) - val encoder = if (qe.isLazyAnalysis) { -RowEncoder.encoderFor(new StructType()) + if (qe.isLazyAnalysis) { Review Comment: I don't get it. My proposal is a pure code cleanup that doesn't change any actual logic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR] Fix code style for if/for/while statements [spark]
MaxGekk closed pull request #48425: [MINOR] Fix code style for if/for/while statements URL: https://github.com/apache/spark/pull/48425 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
cloud-fan commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843593480 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -265,3 +271,257 @@ private[aggregate] object CollectTopK { case _ => throw QueryCompilationErrors.invalidNumParameter(e) } } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ +_FUNC_(expr[, delimiter])[ WITHIN GROUP (ORDER BY key [ASC | DESC] [,...])] - Returns +the concatenation of non-null input values, separated by the delimiter ordered by key. +If all values are null, null is returned. +""", + arguments = """ +Arguments: + * expr - a string or binary expression to be concatenated. + * delimiter - an optional string or binary foldable expression used to separate the input values. +If null, the concatenation will be performed without a delimiter. Default is null. + * key - an optional expression for ordering the input values. Multiple keys can be specified. +If none are specified, the order of the rows in the result is non-deterministic. + """, + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + abc + > SELECT _FUNC_(col) WITHIN GROUP (ORDER BY col DESC) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + cba + > SELECT _FUNC_(col) FROM VALUES ('a'), (NULL), ('b') AS tab(col); + ab + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + aa + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + ab + > SELECT _FUNC_(col, ', ') FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a, b, c + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + note = """ +* If the order is not specified, the function is non-deterministic because +the order of the rows may be non-deterministic after a shuffle. +* If DISTINCT is specified, then expr and key must be the same expression. + """, + group = "agg_funcs", + since = "4.0.0" +) +// scalastyle:on line.size.limit +case class ListAgg( +child: Expression, +delimiter: Expression = Literal(null), +orderExpressions: Seq[SortOrder] = Nil, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends Collect[mutable.ArrayBuffer[Any]] + with SupportsOrderingWithinGroup + with ImplicitCastInputTypes { + + override def isOrderingMandatory: Boolean = false + override def isDistinctSupported: Boolean = true + override protected lazy val bufferElementType: DataType = { +if (noNeedSaveOrderValue) { + child.dataType +} else { + StructType( +StructField("value", child.dataType) ++: orderValuesField + ) +} + } + /** Indicates that the result of [[child]] is enough for evaluation */ + private lazy val noNeedSaveOrderValue: Boolean = isOrderCompatible(orderExpressions) + + def this(child: Expression) = +this(child, Literal(null), Nil, 0, 0) + + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, Nil, 0, 0) + + override def nullable: Boolean = true + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def defaultResult: Option[Literal] = Option(Literal.create(null, dataType)) + + override def sql(isDistinct: Boolean): String = { +val distinct = if (isDistinct) "DISTINCT " else "" +val withinGroup = if (orderingFilled) { + s" WITHIN GROUP (ORDER BY ${orderExpressions.map(_.sql).mkString(", ")})" +} else { + "" +} +s"$prettyName($distinct${child.sql}, ${delimiter.sql})$withinGroup" + } + + override def inputTypes: Seq[AbstractDataType] = +TypeCollection( + StringTypeWithCollation(supportsTrimCollation = true), + BinaryType +) +: +TypeCollection( + StringTypeWithCollation(supportsTrimCollation = true), + BinaryType, + NullType +) +: +orderExpressions.map(_ => AnyDataType) + + override def checkInputDataTypes(): TypeCheckResult = { +val matchInputTypes = super.checkInputDataTypes() +if (matchInputTypes.isFailure) { + return matchInputTypes +} +if (!delimiter.foldable) { + return DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("delimiter"), + "inputType" -> toSQLType(delimiter.dataType), + "inputExpr" -> toSQLExpr(delimiter) +) + ) +} +if (delimiter.dataType ==
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
cloud-fan commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843593193 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala: ## @@ -265,3 +271,257 @@ private[aggregate] object CollectTopK { case _ => throw QueryCompilationErrors.invalidNumParameter(e) } } + +// scalastyle:off line.size.limit +@ExpressionDescription( + usage = """ +_FUNC_(expr[, delimiter])[ WITHIN GROUP (ORDER BY key [ASC | DESC] [,...])] - Returns +the concatenation of non-null input values, separated by the delimiter ordered by key. +If all values are null, null is returned. +""", + arguments = """ +Arguments: + * expr - a string or binary expression to be concatenated. + * delimiter - an optional string or binary foldable expression used to separate the input values. +If null, the concatenation will be performed without a delimiter. Default is null. + * key - an optional expression for ordering the input values. Multiple keys can be specified. +If none are specified, the order of the rows in the result is non-deterministic. + """, + examples = """ +Examples: + > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + abc + > SELECT _FUNC_(col) WITHIN GROUP (ORDER BY col DESC) FROM VALUES ('a'), ('b'), ('c') AS tab(col); + cba + > SELECT _FUNC_(col) FROM VALUES ('a'), (NULL), ('b') AS tab(col); + ab + > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col); + aa + > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS tab(col); + ab + > SELECT _FUNC_(col, ', ') FROM VALUES ('a'), ('b'), ('c') AS tab(col); + a, b, c + > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col); + NULL + """, + note = """ +* If the order is not specified, the function is non-deterministic because +the order of the rows may be non-deterministic after a shuffle. +* If DISTINCT is specified, then expr and key must be the same expression. + """, + group = "agg_funcs", + since = "4.0.0" +) +// scalastyle:on line.size.limit +case class ListAgg( +child: Expression, +delimiter: Expression = Literal(null), +orderExpressions: Seq[SortOrder] = Nil, +mutableAggBufferOffset: Int = 0, +inputAggBufferOffset: Int = 0) + extends Collect[mutable.ArrayBuffer[Any]] + with SupportsOrderingWithinGroup + with ImplicitCastInputTypes { + + override def isOrderingMandatory: Boolean = false + override def isDistinctSupported: Boolean = true + override protected lazy val bufferElementType: DataType = { +if (noNeedSaveOrderValue) { + child.dataType +} else { + StructType( +StructField("value", child.dataType) ++: orderValuesField + ) +} + } + /** Indicates that the result of [[child]] is enough for evaluation */ + private lazy val noNeedSaveOrderValue: Boolean = isOrderCompatible(orderExpressions) + + def this(child: Expression) = +this(child, Literal(null), Nil, 0, 0) + + def this(child: Expression, delimiter: Expression) = +this(child, delimiter, Nil, 0, 0) + + override def nullable: Boolean = true + + override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = mutable.ArrayBuffer.empty + + override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): ImperativeAggregate = +copy(mutableAggBufferOffset = newMutableAggBufferOffset) + + override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): ImperativeAggregate = +copy(inputAggBufferOffset = newInputAggBufferOffset) + + override def defaultResult: Option[Literal] = Option(Literal.create(null, dataType)) + + override def sql(isDistinct: Boolean): String = { +val distinct = if (isDistinct) "DISTINCT " else "" +val withinGroup = if (orderingFilled) { + s" WITHIN GROUP (ORDER BY ${orderExpressions.map(_.sql).mkString(", ")})" +} else { + "" +} +s"$prettyName($distinct${child.sql}, ${delimiter.sql})$withinGroup" + } + + override def inputTypes: Seq[AbstractDataType] = +TypeCollection( + StringTypeWithCollation(supportsTrimCollation = true), + BinaryType +) +: +TypeCollection( + StringTypeWithCollation(supportsTrimCollation = true), + BinaryType, + NullType +) +: +orderExpressions.map(_ => AnyDataType) + + override def checkInputDataTypes(): TypeCheckResult = { +val matchInputTypes = super.checkInputDataTypes() +if (matchInputTypes.isFailure) { + return matchInputTypes +} +if (!delimiter.foldable) { + return DataTypeMismatch( +errorSubClass = "NON_FOLDABLE_INPUT", +messageParameters = Map( + "inputName" -> toSQLId("delimiter"), + "inputType" -> toSQLType(delimiter.dataType), + "inputExpr" -> toSQLExpr(delimiter) +) + ) +} +if (delimiter.dataType ==
Re: [PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]
MaxGekk closed pull request #48858: [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer URL: https://github.com/apache/spark/pull/48858 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]
MaxGekk commented on code in PR #48858: URL: https://github.com/apache/spark/pull/48858#discussion_r1843958488 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala: ## @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.{ + CatalogManager, + CatalogV2Util, + FunctionCatalog, + Identifier, + LookupCatalog +} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.connector.catalog.functions.{ + AggregateFunction => V2AggregateFunction, + ScalarFunction +} +import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors} +import org.apache.spark.sql.types._ + +class FunctionResolution( +override val catalogManager: CatalogManager, +relationResolution: RelationResolution) +extends DataTypeErrorsBase with LookupCatalog { + private val v1SessionCatalog = catalogManager.v1SessionCatalog + + private val trimWarningEnabled = new AtomicBoolean(true) Review Comment: sgtm -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]
MaxGekk commented on PR #48858: URL: https://github.com/apache/spark/pull/48858#issuecomment-2479096472 +1, LGTM. Merging to master. Thank you, @vladimirg-db. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50320][CORE] Make `--remote` an official option by removing `experimental` warning [spark]
dongjoon-hyun closed pull request #48850: [SPARK-50320][CORE] Make `--remote` an official option by removing `experimental` warning URL: https://github.com/apache/spark/pull/48850 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50320][CORE] Make `--remote` an official option by removing `experimental` warning [spark]
dongjoon-hyun commented on PR #48850: URL: https://github.com/apache/spark/pull/48850#issuecomment-2479114901 Thank you, @yaooqinn ! Merged to master for Apache Spark 4.0.0 on February 2025. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP] Subquery only Spark Connect. [spark]
ueshin closed pull request #48863: [WIP] Subquery only Spark Connect. URL: https://github.com/apache/spark/pull/48863 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] Subquery only Spark Connect. [spark]
ueshin opened a new pull request, #48864: URL: https://github.com/apache/spark/pull/48864 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]
ahshahid commented on code in PR #48252: URL: https://github.com/apache/spark/pull/48252#discussion_r1844712588 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala: ## @@ -137,8 +137,16 @@ class SparkSession private[sql] ( /** @inheritdoc */ def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame = { -val encoder = JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]]) -createDataset(encoder, data.iterator().asScala).toDF() +JavaTypeInference.setSparkClientFlag() +val encoderTry = Try { + JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]]) +} +JavaTypeInference.unsetSparkClientFlag() Review Comment: Sure. The thing is that @hvanhovell mentioned that KryoSerializer is not supported for spark-connect client. If the Class for which Encoder is being created, implements KryoSerializable, then preference to Kryo based Encoder is to be given over Java Serializable based Encoder. But if the thread which is creating the encoder is emanating from connect client, then Kryo based encoder should not get created. Instead for such cases Java Serializable based encoder should be created. Instead of adding extra function parameter, the information is passed using thread local. And once the encoder is created, the thread local is unset. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]
ahshahid commented on code in PR #48252: URL: https://github.com/apache/spark/pull/48252#discussion_r1844719865 ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala: ## @@ -148,34 +163,180 @@ object JavaTypeInference { // TODO: we should only collect properties that have getter and setter. However, some tests // pass in scala case class as java bean class which doesn't have getter and setter. val properties = getJavaBeanReadableProperties(c) - // add type variables from inheritance hierarchy of the class - val classTV = JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap ++ -typeVariables - // Note that the fields are ordered by name. - val fields = properties.map { property => -val readMethod = property.getReadMethod -val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet + c, classTV) -// The existence of `javax.annotation.Nonnull`, means this field is not nullable. -val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) -EncoderField( - property.getName, - encoder, - encoder.nullable && !hasNonNull, - Metadata.empty, - Option(readMethod.getName), - Option(property.getWriteMethod).map(_.getName)) + + // if the properties is empty and this is not a top level enclosing class, then we + // should not consider class as bean, as otherwise it will be treated as empty schema + // and loose the data on deser. Review Comment: Lets say the top level class for which encoder is being created, has a field x which is a POJO, but has no Bean type getters. This means field x corresponding schema is empty. So when the DataSet corresponding to top level class is converted to a dataframe, there is no representation of x, in the Row object. So when this data frame is converted back to DataSet, the field x : POJO will be set to null and there is data loss. But when we started , it was NOT NULL. It became null, because schema was empty. So to handle that case, a POJO without getters, should be represented as BinaryType , so that when the dataframe is reconverted, field x gets deserialized pojo. The reason why it is not done for top class is that there are existing tests, which assert that if top level class has no getters, schema should be empty, implying 0 rows and no schema. Now whether that is desirable, or it should be represented as a binary type is debatable. As in any case no meaningful sql operation can be done on binary data . So a distinction is made using the boolean. That is Top level class with no getters need to be treated differently from any field having no getters. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]
ahshahid commented on code in PR #48252: URL: https://github.com/apache/spark/pull/48252#discussion_r1844722744 ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala: ## @@ -148,34 +163,180 @@ object JavaTypeInference { // TODO: we should only collect properties that have getter and setter. However, some tests // pass in scala case class as java bean class which doesn't have getter and setter. val properties = getJavaBeanReadableProperties(c) - // add type variables from inheritance hierarchy of the class - val classTV = JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap ++ -typeVariables - // Note that the fields are ordered by name. - val fields = properties.map { property => -val readMethod = property.getReadMethod -val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet + c, classTV) -// The existence of `javax.annotation.Nonnull`, means this field is not nullable. -val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) -EncoderField( - property.getName, - encoder, - encoder.nullable && !hasNonNull, - Metadata.empty, - Option(readMethod.getName), - Option(property.getWriteMethod).map(_.getName)) + + // if the properties is empty and this is not a top level enclosing class, then we + // should not consider class as bean, as otherwise it will be treated as empty schema + // and loose the data on deser. + if (properties.isEmpty && seenTypeSet.nonEmpty) { +findBestEncoder(Seq(c), seenTypeSet, typeVariables, None, serializableEncodersOnly = true) Review Comment: will revert on this.. by going through the code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]
ahshahid commented on code in PR #48252: URL: https://github.com/apache/spark/pull/48252#discussion_r1844722897 ## sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala: ## @@ -2802,6 +2821,79 @@ class DatasetSuite extends QueryTest } } } + + test("SPARK-49789 Bean class encoding with generic type implementing Serializable") { +// just create encoder +val enc = Encoders.bean(classOf[MessageWrapper[_]]) +val data = Seq("test1", "test2").map(str => { + val msg = new MessageWrapper[String]() + msg.setMessage(str) + msg +}) +validateParamBeanDataset(classOf[MessageWrapper[String]], + data, mutable.Buffer(data: _*), + StructType(Seq(StructField("message", BinaryType, true))) +) + } + + test("SPARK-49789 Bean class encoding with generic type indirectly extending" + +" Serializable class") { +// just create encoder +Encoders.bean(classOf[BigDecimalMessageWrapper[_]]) +val data = Seq(2d, 8d).map(doub => { + val bean = new BigDecimalMessageWrapper[DerivedBigDecimalExtender]() + bean.setMessage(new DerivedBigDecimalExtender(doub)) + bean +}) +validateParamBeanDataset( + classOf[BigDecimalMessageWrapper[DerivedBigDecimalExtender]], + data, mutable.Buffer(data: _*), + StructType(Seq(StructField("message", BinaryType, true + } + + test("SPARK-49789. test bean class with generictype bound of UDTType") { +// just create encoder +UDTRegistration.register(classOf[TestUDT].getName, classOf[TestUDTType].getName) +val enc = Encoders.bean(classOf[UDTBean[_]]) +val baseData = Seq((1, "a"), (2, "b")) +val data = baseData.map(tup => { + val bean = new UDTBean[TestUDT]() + bean.setMessage(new TestUDTImplSub(tup._1, tup._2)) + bean +}) +val expectedData = baseData.map(tup => { + val bean = new UDTBean[TestUDT]() + bean.setMessage(new TestUDTImpl(tup._1, tup._2)) + bean +}) +validateParamBeanDataset( + classOf[UDTBean[TestUDT]], + data, mutable.Buffer(expectedData: _*), + StructType(Seq(StructField("message", new TestUDTType(), true + } + + private def validateParamBeanDataset[T]( + classToEncode: Class[T], + data: Seq[T], + expectedData: mutable.Buffer[T], + expectedSchema: StructType): Unit = { + Review Comment: thanks. will do that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR] Use LinkedHashSet for ResolveLateralColumnAliasReference to generate stable hash for the plan [spark]
github-actions[bot] commented on PR #47571: URL: https://github.com/apache/spark/pull/47571#issuecomment-2480202800 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Enhance the metrics in SparkUI with logical plan stats [spark]
github-actions[bot] closed pull request #47534: Enhance the metrics in SparkUI with logical plan stats URL: https://github.com/apache/spark/pull/47534 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Fixed comma splice in cluster-overview.md [spark]
github-actions[bot] commented on PR #47615: URL: https://github.com/apache/spark/pull/47615#issuecomment-2480202795 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]
ahshahid commented on code in PR #48252: URL: https://github.com/apache/spark/pull/48252#discussion_r1844724224 ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala: ## @@ -148,34 +163,180 @@ object JavaTypeInference { // TODO: we should only collect properties that have getter and setter. However, some tests // pass in scala case class as java bean class which doesn't have getter and setter. val properties = getJavaBeanReadableProperties(c) - // add type variables from inheritance hierarchy of the class - val classTV = JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap ++ -typeVariables - // Note that the fields are ordered by name. - val fields = properties.map { property => -val readMethod = property.getReadMethod -val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet + c, classTV) -// The existence of `javax.annotation.Nonnull`, means this field is not nullable. -val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) -EncoderField( - property.getName, - encoder, - encoder.nullable && !hasNonNull, - Metadata.empty, - Option(readMethod.getName), - Option(property.getWriteMethod).map(_.getName)) + + // if the properties is empty and this is not a top level enclosing class, then we + // should not consider class as bean, as otherwise it will be treated as empty schema + // and loose the data on deser. + if (properties.isEmpty && seenTypeSet.nonEmpty) { +findBestEncoder(Seq(c), seenTypeSet, typeVariables, None, serializableEncodersOnly = true) + .getOrElse(throw ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName)) + } else { +// add type variables from inheritance hierarchy of the class +val parentClassesTypeMap = + JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap +val classTV = parentClassesTypeMap ++ typeVariables +// Note that the fields are ordered by name. +val fields = properties.map { property => + val readMethod = property.getReadMethod + val methodReturnType = readMethod.getGenericReturnType + val encoder = encoderFor(methodReturnType, seenTypeSet + c, classTV) + // The existence of `javax.annotation.Nonnull`, means this field is not nullable. + val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) + EncoderField( +property.getName, +encoder, +encoder.nullable && !hasNonNull, +Metadata.empty, +Option(readMethod.getName), +Option(property.getWriteMethod).map(_.getName)) +} +// implies it cannot be assumed a BeanClass. +// Check if its super class or interface could be represented by an Encoder Review Comment: let me go through. will get back. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48835] Introduce versoning to jdbc connectors [spark]
github-actions[bot] commented on PR #47181: URL: https://github.com/apache/spark/pull/47181#issuecomment-2480202821 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
bogao007 commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1844724391 ## python/pyspark/sql/streaming/stateful_processor.py: ## @@ -420,10 +411,27 @@ def handleInputRows( timer_values: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. +""" +return iter([]) Review Comment: Why do we change the `...` placeholder here? ## python/pyspark/sql/streaming/stateful_processor.py: ## @@ -420,10 +411,27 @@ def handleInputRows( timer_values: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. +""" +return iter([]) + +def handleExpiredTimer( Review Comment: Just double check that this method is not required for users to implement, correct? ## python/pyspark/sql/pandas/group_ops.py: ## @@ -573,7 +568,16 @@ def transformWithStateUDF( statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.CLOSED) return iter([]) -result = handle_data_with_timers(statefulProcessorApiClient, key, inputRows) +if timeMode != "none": +batch_timestamp = statefulProcessorApiClient.get_batch_timestamp() +watermark_timestamp = statefulProcessorApiClient.get_watermark_timestamp() +else: +batch_timestamp = -1 +watermark_timestamp = -1 Review Comment: Can we abstract this as a separate method and share in both UDFs to reduce redundant code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]
ahshahid commented on code in PR #48252: URL: https://github.com/apache/spark/pull/48252#discussion_r1844723692 ## sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala: ## @@ -2909,6 +3016,7 @@ object KryoData { /** Used to test Java encoder. */ class JavaData(val a: Int) extends Serializable { + def this() = this(0) Review Comment: Any class which is Serializable, for serializer to work correctly, needs to have a default empty argument constructor. Otherwise there will be runtime exception. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49676][SS][PYTHON] Add Support for Chaining of Operators in transformWithStateInPandas API [spark]
jingz-db commented on PR #48124: URL: https://github.com/apache/spark/pull/48124#issuecomment-2480233401 Hey @HeartSaVioR, could you take another look? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48769][SQL] Support constant folding for ScalaUDF [spark]
github-actions[bot] closed pull request #47164: [SPARK-48769][SQL] Support constant folding for ScalaUDF URL: https://github.com/apache/spark/pull/47164 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]
ahshahid commented on code in PR #48252: URL: https://github.com/apache/spark/pull/48252#discussion_r1844713507 ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala: ## @@ -148,34 +163,180 @@ object JavaTypeInference { // TODO: we should only collect properties that have getter and setter. However, some tests // pass in scala case class as java bean class which doesn't have getter and setter. val properties = getJavaBeanReadableProperties(c) - // add type variables from inheritance hierarchy of the class - val classTV = JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap ++ -typeVariables - // Note that the fields are ordered by name. - val fields = properties.map { property => -val readMethod = property.getReadMethod -val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet + c, classTV) -// The existence of `javax.annotation.Nonnull`, means this field is not nullable. -val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) -EncoderField( - property.getName, - encoder, - encoder.nullable && !hasNonNull, - Metadata.empty, - Option(readMethod.getName), - Option(property.getWriteMethod).map(_.getName)) + + // if the properties is empty and this is not a top level enclosing class, then we + // should not consider class as bean, as otherwise it will be treated as empty schema + // and loose the data on deser. + if (properties.isEmpty && seenTypeSet.nonEmpty) { +findBestEncoder(Seq(c), seenTypeSet, typeVariables, None, serializableEncodersOnly = true) + .getOrElse(throw ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName)) + } else { +// add type variables from inheritance hierarchy of the class +val parentClassesTypeMap = + JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap +val classTV = parentClassesTypeMap ++ typeVariables +// Note that the fields are ordered by name. +val fields = properties.map { property => + val readMethod = property.getReadMethod + val methodReturnType = readMethod.getGenericReturnType + val encoder = encoderFor(methodReturnType, seenTypeSet + c, classTV) + // The existence of `javax.annotation.Nonnull`, means this field is not nullable. + val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) + EncoderField( +property.getName, +encoder, +encoder.nullable && !hasNonNull, +Metadata.empty, +Option(readMethod.getName), +Option(property.getWriteMethod).map(_.getName)) +} +// implies it cannot be assumed a BeanClass. +// Check if its super class or interface could be represented by an Encoder + +JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq) } - JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq) case _ => throw ExecutionErrors.cannotFindEncoderForTypeError(t.toString) } + private def createUDTEncoderUsingAnnotation(c: Class[_]): UDTEncoder[Any] = { +val udt = c + .getAnnotation(classOf[SQLUserDefinedType]) + .udt() + .getConstructor() + .newInstance() + .asInstanceOf[UserDefinedType[Any]] +val udtClass = udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt() +UDTEncoder(udt, udtClass) + } + + private def createUDTEncoderUsingRegistration(c: Class[_]): UDTEncoder[Any] = { +val udt = UDTRegistration + .getUDTFor(c.getName) + .get + .getConstructor() + .newInstance() + .asInstanceOf[UserDefinedType[Any]] +UDTEncoder(udt, udt.getClass) + } + def getJavaBeanReadableProperties(beanClass: Class[_]): Array[PropertyDescriptor] = { val beanInfo = Introspector.getBeanInfo(beanClass) beanInfo.getPropertyDescriptors .filterNot(_.getName == "class") .filterNot(_.getName == "declaringClass") .filter(_.getReadMethod != null) } + + private def findBestEncoder( + typesToCheck: Seq[Class[_]], + seenTypeSet: Set[Class[_]], + typeVariables: Map[TypeVariable[_], Type], + baseClass: Option[Class[_]], + serializableEncodersOnly: Boolean = false): Option[AgnosticEncoder[_]] = +if (serializableEncodersOnly) { + val isClientConnect = clientConnectFlag.get + assert(typesToCheck.size == 1) + typesToCheck +.flatMap(c => { + if (!isClientConnect && classOf[KryoSerializable].isAssignableFrom(c)) { Review Comment: will do. the expalnation is provided in the previous comment's response. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: rev
Re: [PR] [SPARK-50324][PYTHON][CONNECT] Make `createDataFrame` trigger `Config` RPC at most once [spark]
HyukjinKwon commented on code in PR #48856: URL: https://github.com/apache/spark/pull/48856#discussion_r1844745641 ## python/pyspark/sql/connect/session.py: ## @@ -706,9 +724,9 @@ def createDataFrame( else: local_relation = LocalRelation(_table) -cache_threshold = self._client.get_configs("spark.sql.session.localRelationCacheThreshold") +cache_threshold = conf_getter["spark.sql.session.localRelationCacheThreshold"] Review Comment: Shall we just get all the confs in batch eagerly? Seems like we should get the conf once anyway. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50324][PYTHON][CONNECT] Make `createDataFrame` trigger `Config` RPC at most once [spark]
zhengruifeng commented on code in PR #48856: URL: https://github.com/apache/spark/pull/48856#discussion_r1844867144 ## python/pyspark/sql/connect/session.py: ## @@ -706,9 +724,9 @@ def createDataFrame( else: local_relation = LocalRelation(_table) -cache_threshold = self._client.get_configs("spark.sql.session.localRelationCacheThreshold") +cache_threshold = conf_getter["spark.sql.session.localRelationCacheThreshold"] Review Comment: there are 2 cases that we don't need the configs: 1, the local data is empty, and the schema is specified, it returns a valid empty df; 2, the creation fails due to some assertions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy [spark]
ueshin commented on code in PR #48829: URL: https://github.com/apache/spark/pull/48829#discussion_r1844320464 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -95,13 +95,12 @@ private[sql] object Dataset { def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = sparkSession.withActive { val qe = sparkSession.sessionState.executePlan(logicalPlan) - val encoder = if (qe.isLazyAnalysis) { -RowEncoder.encoderFor(new StructType()) + if (qe.isLazyAnalysis) { Review Comment: The lazy encoder creation only happens when `qe.isLazyAnalysis`; otherwise create it right here as same as before to avoid changing the behavior. `Dataset` has a new default constructor to take a function for the lazy case, and the original default constructor is still there to keep the behavior. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]
brkyvz commented on code in PR #48401: URL: https://github.com/apache/spark/pull/48401#discussion_r1844378796 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -343,7 +497,8 @@ class StatefulProcessorHandleImpl( * actually done. We need this class because we can only collect the schemas after * the StatefulProcessor is initialized. */ -class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any]) +class DriverStatefulProcessorHandleImpl( +timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any], initializeAvroEnc: Boolean) Review Comment: nit: one line per parameter please ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -718,6 +733,22 @@ object TransformWithStateExec { stateStoreCkptIds = None ) +val stateStoreEncoding = child.session.sessionState.conf.getConf( + SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT +) + +def getDriverProcessorHandle(): DriverStatefulProcessorHandleImpl = { + val driverProcessorHandle = new DriverStatefulProcessorHandleImpl( +timeMode, keyEncoder, initializeAvroEnc = + avroEncodingEnabled(stateStoreEncoding)) + driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT) + statefulProcessor.setHandle(driverProcessorHandle) + statefulProcessor.init(outputMode, timeMode) + driverProcessorHandle +} + +val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas Review Comment: nit: maybe if you add a `withColumnFamilySchema` method, you can remove the need for this duplication and can just call it below after creating the class ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -104,7 +106,9 @@ case class TransformWithStateExec( * @return a new instance of the driver processor handle */ private def getDriverProcessorHandle(): DriverStatefulProcessorHandleImpl = { Review Comment: nit: should this method have an assertion that it is being called on the Driver? ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -596,7 +601,8 @@ case class TransformWithStateExec( private def initNewStateStoreAndProcessData( partitionId: Int, hadoopConfBroadcast: Broadcast[SerializableConfiguration]) -(f: StateStore => CompletionIterator[InternalRow, Iterator[InternalRow]]): +(f: StateStore => + CompletionIterator[InternalRow, Iterator[InternalRow]]): Review Comment: also nit: should we use type aliasing to shorten this `CompletionIterator[InternalRow, Iterator[InternalRow]]`? Like `type ResultIterator = CompletionIterator[InternalRow, Iterator[InternalRow]]` ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala: ## @@ -118,6 +122,14 @@ class StatefulProcessorHandleImpl( currState = CREATED + private def getAvroEnc(stateName: String): Option[AvroEncoder] = { +if (!schemas.contains(stateName)) { + None +} else { + schemas(stateName).avroEnc +} Review Comment: `schemas.get(stateName).map(_.avroEnc)`? ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -596,7 +601,8 @@ case class TransformWithStateExec( private def initNewStateStoreAndProcessData( partitionId: Int, hadoopConfBroadcast: Broadcast[SerializableConfiguration]) -(f: StateStore => CompletionIterator[InternalRow, Iterator[InternalRow]]): +(f: StateStore => + CompletionIterator[InternalRow, Iterator[InternalRow]]): Review Comment: uber nit: change necessary? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]
sririshindra commented on code in PR #48252: URL: https://github.com/apache/spark/pull/48252#discussion_r1844241460 ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala: ## @@ -148,34 +163,180 @@ object JavaTypeInference { // TODO: we should only collect properties that have getter and setter. However, some tests // pass in scala case class as java bean class which doesn't have getter and setter. val properties = getJavaBeanReadableProperties(c) - // add type variables from inheritance hierarchy of the class - val classTV = JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap ++ -typeVariables - // Note that the fields are ordered by name. - val fields = properties.map { property => -val readMethod = property.getReadMethod -val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet + c, classTV) -// The existence of `javax.annotation.Nonnull`, means this field is not nullable. -val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) -EncoderField( - property.getName, - encoder, - encoder.nullable && !hasNonNull, - Metadata.empty, - Option(readMethod.getName), - Option(property.getWriteMethod).map(_.getName)) + + // if the properties is empty and this is not a top level enclosing class, then we + // should not consider class as bean, as otherwise it will be treated as empty schema + // and loose the data on deser. + if (properties.isEmpty && seenTypeSet.nonEmpty) { +findBestEncoder(Seq(c), seenTypeSet, typeVariables, None, serializableEncodersOnly = true) + .getOrElse(throw ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName)) + } else { +// add type variables from inheritance hierarchy of the class +val parentClassesTypeMap = + JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap +val classTV = parentClassesTypeMap ++ typeVariables +// Note that the fields are ordered by name. +val fields = properties.map { property => + val readMethod = property.getReadMethod + val methodReturnType = readMethod.getGenericReturnType + val encoder = encoderFor(methodReturnType, seenTypeSet + c, classTV) + // The existence of `javax.annotation.Nonnull`, means this field is not nullable. + val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull]) + EncoderField( +property.getName, +encoder, +encoder.nullable && !hasNonNull, +Metadata.empty, +Option(readMethod.getName), +Option(property.getWriteMethod).map(_.getName)) +} +// implies it cannot be assumed a BeanClass. +// Check if its super class or interface could be represented by an Encoder + +JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq) } - JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq) case _ => throw ExecutionErrors.cannotFindEncoderForTypeError(t.toString) } + private def createUDTEncoderUsingAnnotation(c: Class[_]): UDTEncoder[Any] = { +val udt = c + .getAnnotation(classOf[SQLUserDefinedType]) + .udt() + .getConstructor() + .newInstance() + .asInstanceOf[UserDefinedType[Any]] +val udtClass = udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt() +UDTEncoder(udt, udtClass) + } + + private def createUDTEncoderUsingRegistration(c: Class[_]): UDTEncoder[Any] = { +val udt = UDTRegistration + .getUDTFor(c.getName) + .get + .getConstructor() + .newInstance() + .asInstanceOf[UserDefinedType[Any]] +UDTEncoder(udt, udt.getClass) + } + def getJavaBeanReadableProperties(beanClass: Class[_]): Array[PropertyDescriptor] = { val beanInfo = Introspector.getBeanInfo(beanClass) beanInfo.getPropertyDescriptors .filterNot(_.getName == "class") .filterNot(_.getName == "declaringClass") .filter(_.getReadMethod != null) } + + private def findBestEncoder( + typesToCheck: Seq[Class[_]], + seenTypeSet: Set[Class[_]], + typeVariables: Map[TypeVariable[_], Type], + baseClass: Option[Class[_]], + serializableEncodersOnly: Boolean = false): Option[AgnosticEncoder[_]] = +if (serializableEncodersOnly) { + val isClientConnect = clientConnectFlag.get + assert(typesToCheck.size == 1) + typesToCheck +.flatMap(c => { + if (!isClientConnect && classOf[KryoSerializable].isAssignableFrom(c)) { Review Comment: Can we add a comment explaining why isClientConnect being true disqualifies the type to be encodes using kryo Serializer. Is there a chance that will change in the future? If so, can we add a TODO statement here so that we can remove this condition if and when Kr
Re: [PR] [WIP] Subquery only Spark Connect. [spark]
ueshin closed pull request #48864: [WIP] Subquery only Spark Connect. URL: https://github.com/apache/spark/pull/48864 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SQL][SPARK-50329] fix InSet$toString [spark]
averyqi-db opened a new pull request, #48865: URL: https://github.com/apache/spark/pull/48865 ### What changes were proposed in this pull request? Fix InSet$toString for unresolved plan node ### Why are the changes needed? InSet$toString should always work even for unresolved node ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? end to end test by running TPCDS benchmark suite with planChangeLog enabled. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50329][SQL] fix InSet$toString [spark]
HyukjinKwon commented on code in PR #48865: URL: https://github.com/apache/spark/pull/48865#discussion_r1844738816 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala: ## @@ -609,6 +609,9 @@ case class InSet(child: Expression, hset: Set[Any]) extends UnaryExpression with require(hset != null, "hset could not be null") override def toString: String = { +if (!child.resolved) { + return s"$child INSET (values with unresolved data types)" +} Review Comment: can we have a simple test tho? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50309][DOCS] Document `SQL Pipe` Syntax [spark]
gengliangwang closed pull request #48852: [SPARK-50309][DOCS] Document `SQL Pipe` Syntax URL: https://github.com/apache/spark/pull/48852 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Subquery only Spark Connect. [spark]
ueshin opened a new pull request, #48863: URL: https://github.com/apache/spark/pull/48863 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49787][SQL] Cast between UDT and other types [spark]
HyukjinKwon commented on PR #48251: URL: https://github.com/apache/spark/pull/48251#issuecomment-2480360796 Actually I wanted to make a fix like this a long time ago, and gave up after reading ANSI spec because UDT cannot be casted to any type according to it IIRC. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
cloud-fan commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843587760 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ## @@ -2219,21 +2219,24 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor numArgs: Int, u: UnresolvedFunction): Expression = { func match { -case owg: SupportsOrderingWithinGroup if u.isDistinct => - throw QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError( -owg.prettyName) +case owg: SupportsOrderingWithinGroup if !owg.isDistinctSupported && u.isDistinct => +throw QueryCompilationErrors.distinctWithOrderingFunctionUnsupportedError( + owg.prettyName) case owg: SupportsOrderingWithinGroup - if !owg.orderingFilled && u.orderingWithinGroup.isEmpty => - throw QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError( -owg.prettyName) -case owg: SupportsOrderingWithinGroup - if owg.orderingFilled && u.orderingWithinGroup.nonEmpty => - throw QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError( + if owg.isOrderingMandatory && !owg.orderingFilled && u.orderingWithinGroup.isEmpty => +throw QueryCompilationErrors.functionMissingWithinGroupError(owg.prettyName) +case owg: Mode if owg.orderingFilled && u.orderingWithinGroup.nonEmpty => Review Comment: why we only check `Mode` now? Isn't it a general check? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
cloud-fan commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843590480 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala: ## @@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.SortOrder /** - * The trait used to set the [[SortOrder]] after inverse distribution functions parsed. + * The trait used to set the [[SortOrder]] for supporting functions. */ -trait SupportsOrderingWithinGroup { self: AggregateFunction => Review Comment: why remove `self: AggregateFunction =>`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
cloud-fan commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843591341 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala: ## @@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.SortOrder /** - * The trait used to set the [[SortOrder]] after inverse distribution functions parsed. + * The trait used to set the [[SortOrder]] for supporting functions. */ -trait SupportsOrderingWithinGroup { self: AggregateFunction => - def orderingFilled: Boolean = false +trait SupportsOrderingWithinGroup { def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): AggregateFunction + + /** Indicator that ordering was set. */ + def orderingFilled: Boolean + + /** + * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function. + * + * @see [[QueryCompilationErrors.functionMissingWithinGroupError]] + * @see [[org.apache.spark.sql.catalyst.analysis.Analyzer]] Review Comment: this isn't very useful as Analyzer has thousands of lines of code... ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala: ## @@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.SortOrder /** - * The trait used to set the [[SortOrder]] after inverse distribution functions parsed. + * The trait used to set the [[SortOrder]] for supporting functions. */ -trait SupportsOrderingWithinGroup { self: AggregateFunction => - def orderingFilled: Boolean = false +trait SupportsOrderingWithinGroup { def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): AggregateFunction + + /** Indicator that ordering was set. */ + def orderingFilled: Boolean + + /** + * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function. + * + * @see [[QueryCompilationErrors.functionMissingWithinGroupError]] + * @see [[org.apache.spark.sql.catalyst.analysis.Analyzer]] + */ + def isOrderingMandatory: Boolean + + /** + * Tells Analyzer that DISTINCT is supported. + * The DISTINCT can conflict with order so some functions can ban it. + * + * @see [[QueryCompilationErrors.functionMissingWithinGroupError]] + * @see [[org.apache.spark.sql.catalyst.analysis.Analyzer]] Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]
vladimirg-db commented on code in PR #48858: URL: https://github.com/apache/spark/pull/48858#discussion_r1843505454 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala: ## @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.analysis + +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.catalyst.FunctionIdentifier +import org.apache.spark.sql.catalyst.expressions._ +import org.apache.spark.sql.catalyst.expressions.aggregate._ +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.connector.catalog.{ + CatalogManager, + CatalogV2Util, + FunctionCatalog, + Identifier, + LookupCatalog +} +import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._ +import org.apache.spark.sql.connector.catalog.functions.{ + AggregateFunction => V2AggregateFunction, + ScalarFunction +} +import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors} +import org.apache.spark.sql.types._ + +class FunctionResolution( +override val catalogManager: CatalogManager, +relationResolution: RelationResolution) +extends DataTypeErrorsBase with LookupCatalog { + private val v1SessionCatalog = catalogManager.v1SessionCatalog + + private val trimWarningEnabled = new AtomicBoolean(true) Review Comment: Since this atomic boolean was in `object FunctionResolution` _inside_ `class Analyzer`, we have to move it to `class FunctionResolution`, and not to global `object FunctionResolution`. @MaxGekk please correct me if I'm wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR] Fix code style for if/for/while statements [spark]
MaxGekk commented on PR #48425: URL: https://github.com/apache/spark/pull/48425#issuecomment-2478527669 +1, LGTM. Merging to master. Thank you, @exmy. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
mikhailnik-db commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843628058 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala: ## @@ -2219,21 +2219,24 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor numArgs: Int, u: UnresolvedFunction): Expression = { func match { -case owg: SupportsOrderingWithinGroup if u.isDistinct => - throw QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError( -owg.prettyName) +case owg: SupportsOrderingWithinGroup if !owg.isDistinctSupported && u.isDistinct => +throw QueryCompilationErrors.distinctWithOrderingFunctionUnsupportedError( + owg.prettyName) case owg: SupportsOrderingWithinGroup - if !owg.orderingFilled && u.orderingWithinGroup.isEmpty => - throw QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError( -owg.prettyName) -case owg: SupportsOrderingWithinGroup - if owg.orderingFilled && u.orderingWithinGroup.nonEmpty => - throw QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError( + if owg.isOrderingMandatory && !owg.orderingFilled && u.orderingWithinGroup.isEmpty => +throw QueryCompilationErrors.functionMissingWithinGroupError(owg.prettyName) +case owg: Mode if owg.orderingFilled && u.orderingWithinGroup.nonEmpty => Review Comment: I described above this case: > Plus this is a strange case, because the actual check of order expressions number happens in withOrderingWithinGroup(...) implementation. This case can match only Mode function if mode(expr) within group (order by expr) syntax is used. And that's because mode(expr) is equivalent to mode() within group (order by expr), and here we actually just ban using two syntaxes simultaneously. I think the code and error message is quite misleading. if `owg.orderingFilled` is true before `withOrderingWithinGroup(u.orderingWithinGroup)` was called means that the implementation makes some hacks to fill it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50295][INFRA] Add a script to build docs with image [spark]
pan3793 commented on code in PR #48860: URL: https://github.com/apache/spark/pull/48860#discussion_r1843718476 ## dev/spark-test-image/docs/build-docs-on-local: ## @@ -0,0 +1,68 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +if ! [ -x "$(command -v docker)" ]; then + echo "Error: Docker is not installed." >&2 + exit 1 +fi + +DOCKER_CACHE_IMG="ghcr.io/apache/spark/apache-spark-github-action-image-docs-cache:master" +REPO_OWNER="apache/spark" +IMG_TAG=$(date +%s) +IMG_NAME="apache-spark-ci-image-docs:${IMG_TAG}" +IMG_URL="$REPO_OWNER/$IMG_NAME" +DOCKER_MOUNT_SPARK_HOME="/__w/spark/spark" +BUILD_DOCS_SCRIPT_PATH="${DOCKER_MOUNT_SPARK_HOME}/dev/spark-test-image/docs/build-docs-in-container" + +FWDIR="$(cd "`dirname "${BASH_SOURCE[0]}"`"; pwd)" +SPARK_HOME="$(cd "`dirname "${BASH_SOURCE[0]}"`"/../../..; pwd)" + +# 1.Compile Spark outside the container to prepare for the next step of generating documents using the container +build/sbt -Phive -Pkinesis-asl clean unidoc package + +# 2.build container image +docker buildx build \ + --cache-from type=registry,ref="${DOCKER_CACHE_IMG}" \ + --tag "${IMG_URL}" "${FWDIR}" + +# 3.build docs on container: `error docs`, `scala doc`, `python doc`, `sql doc` +docker run \ + --mount type=bind,source="${SPARK_HOME}",target="${DOCKER_MOUNT_SPARK_HOME}" \ Review Comment: if the container is going to write files to the mounted path, please make sure permission won't bother the user accessing/deleting from host. for example, if the container writes files with uid 0, the host user may have no permission to delete them. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
cloud-fan commented on code in PR #48847: URL: https://github.com/apache/spark/pull/48847#discussion_r1843582476 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala: ## @@ -189,7 +189,8 @@ object BindParameters extends ParameterizedQueryProcessor with QueryErrorsBase { // We should wait for `CTESubstitution` to resolve CTE before binding parameters, as CTE // relations are not children of `UnresolvedWith`. Review Comment: I'm thinking of a different approach: shall we explicitly match `UnresolvedWith` here and bind parameters in the CTE relations? Then we no longer need this hack to delay parameter binding. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
mikhailnik-db commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843674767 ## sql/core/src/test/resources/sql-tests/results/listagg.sql.out: ## @@ -0,0 +1,436 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +b|c +c|d + + +-- !query +SELECT listagg(a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query schema +struct +-- !query output +NULL +a +b +ba +ba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +b|a|b|a + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭB�� + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query schema +struct +-- !query output +123true,false,false + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", Review Comment: Should be `DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE`. Fixed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50325][SQL] Factor out alias resolution to be reused in the single-pass Analyzer [spark]
MaxGekk commented on PR #48857: URL: https://github.com/apache/spark/pull/48857#issuecomment-2478677345 +1, LGTM. Merging to master. Thank you, @vladimirg-db. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50325][SQL] Factor out alias resolution to be reused in the single-pass Analyzer [spark]
MaxGekk closed pull request #48857: [SPARK-50325][SQL] Factor out alias resolution to be reused in the single-pass Analyzer URL: https://github.com/apache/spark/pull/48857 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
mikhailnik-db commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843673496 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala: ## @@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.SortOrder /** - * The trait used to set the [[SortOrder]] after inverse distribution functions parsed. + * The trait used to set the [[SortOrder]] for supporting functions. */ -trait SupportsOrderingWithinGroup { self: AggregateFunction => - def orderingFilled: Boolean = false +trait SupportsOrderingWithinGroup { def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): AggregateFunction + + /** Indicator that ordering was set. */ + def orderingFilled: Boolean + + /** + * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function. + * + * @see [[QueryCompilationErrors.functionMissingWithinGroupError]] + * @see [[org.apache.spark.sql.catalyst.analysis.Analyzer]] Review Comment: In my head the idea was to use it with the error function together to find the exact line, but then the error function is enough) Removed -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
MaxGekk closed pull request #48847: [SPARK-50322][SQL] Fix parameterized identifier in a sub-query URL: https://github.com/apache/spark/pull/48847 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
MaxGekk commented on PR #48847: URL: https://github.com/apache/spark/pull/48847#issuecomment-2478680192 Merging to master. Thank you, @srielau and @cloud-fan for review. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50049][SQL] Support custom driver metrics in writing to v2 table [spark]
cloud-fan commented on PR #48573: URL: https://github.com/apache/spark/pull/48573#issuecomment-2478369932 thanks, merging to master! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
MaxGekk commented on code in PR #48847: URL: https://github.com/apache/spark/pull/48847#discussion_r1843530688 ## sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala: ## @@ -741,4 +741,21 @@ class ParametersSuite extends QueryTest with SharedSparkSession with PlanTest { Row("c1")) } } + + test("SPARK-50322: parameterized identifier in a sub-query") { +withTable("tt1") { + sql("create table tt1(c1 int)") + sql("insert into tt1 values (1)") + def query(p: String): String = { +s""" + |with v1 as ( + | select * from tt1 + | where 1 = (Select * from identifier($p)) Review Comment: I uppercased all SQL keywords in the test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
mikhailnik-db commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843656733 ## sql/core/src/test/resources/sql-tests/results/listagg.sql.out: ## @@ -0,0 +1,436 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +b|c +c|d + + +-- !query +SELECT listagg(a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query schema +struct +-- !query output +NULL +a +b +ba +ba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +b|a|b|a + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭB�� + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query schema +struct +-- !query output +123true,false,false + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { +"error" : "','", +"hint" : "" + } +} + + +-- !query +SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + "sqlState" : "42K09", + "messageParameters" : { +"dataType" : "(\"BINARY\" or \"STRING\")", +"functionName" : "`listagg`", +"sqlExpr" : "\"listagg(c1, , )\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 24, +"fragment" : "listagg(c1, ', ')" + } ] +} + + +-- !query +SELECT listagg(b, a) FROM df GROUP BY a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { +"inputExpr" : "\"a\"", +"inputName" : "`delimiter`", +"inputType" : "\"STRING\"", +"sqlExpr" : "\"listagg(b, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 20, +"fragment" : "listagg(b, a)" + } ] +} + + +-- !query +SELECT listagg(a) OVER (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +NULL +aa +aa +aabb +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { +"aggFunc" : "\"listagg(a, NULL, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIn
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
mikhailnik-db commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843655281 ## sql/core/src/test/resources/sql-tests/results/listagg.sql.out: ## @@ -0,0 +1,436 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +b|c +c|d + + +-- !query +SELECT listagg(a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query schema +struct +-- !query output +NULL +a +b +ba +ba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +b|a|b|a + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭB�� + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query schema +struct +-- !query output +123true,false,false + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { +"error" : "','", +"hint" : "" + } +} + + +-- !query +SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + "sqlState" : "42K09", + "messageParameters" : { +"dataType" : "(\"BINARY\" or \"STRING\")", +"functionName" : "`listagg`", +"sqlExpr" : "\"listagg(c1, , )\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 24, +"fragment" : "listagg(c1, ', ')" + } ] +} + + +-- !query +SELECT listagg(b, a) FROM df GROUP BY a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { +"inputExpr" : "\"a\"", +"inputName" : "`delimiter`", +"inputType" : "\"STRING\"", +"sqlExpr" : "\"listagg(b, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 20, +"fragment" : "listagg(b, a)" + } ] +} + + +-- !query +SELECT listagg(a) OVER (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +NULL +aa +aa +aabb +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { +"aggFunc" : "\"listagg(a, NULL, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIn
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
mikhailnik-db commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843664606 ## sql/core/src/test/resources/sql-tests/results/listagg.sql.out: ## @@ -0,0 +1,436 @@ +-- Automatically generated by SQLQueryTestSuite +-- !query +CREATE TEMP VIEW df AS +SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, NULL)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +CREATE TEMP VIEW df2 AS +SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b) +-- !query schema +struct<> +-- !query output + + + +-- !query +SELECT listagg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT string_agg(b) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b, NULL) FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +bc +cd + + +-- !query +SELECT listagg(b) FROM df WHERE 1 != 1 +-- !query schema +struct +-- !query output +NULL + + +-- !query +SELECT listagg(b, '|') FROM df GROUP BY a +-- !query schema +struct +-- !query output +NULL +b|c +c|d + + +-- !query +SELECT listagg(a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(DISTINCT a) FROM df +-- !query schema +struct +-- !query output +ab + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df +-- !query schema +struct +-- !query output +NULL +a +b +ba +ba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df +-- !query schema +struct +-- !query output +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df +-- !query schema +struct +-- !query output +b|a|b|a + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df +-- !query schema +struct +-- !query output +baba + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df +-- !query schema +struct +-- !query output +bbaa + + +-- !query +SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭ�� + + +-- !query +SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct +-- !query output +ޭB�� + + +-- !query +SELECT listagg(a), listagg(b, ',') FROM df2 +-- !query schema +struct +-- !query output +123true,false,false + + +-- !query +SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.parser.ParseException +{ + "errorClass" : "PARSE_SYNTAX_ERROR", + "sqlState" : "42601", + "messageParameters" : { +"error" : "','", +"hint" : "" + } +} + + +-- !query +SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES", + "sqlState" : "42K09", + "messageParameters" : { +"dataType" : "(\"BINARY\" or \"STRING\")", +"functionName" : "`listagg`", +"sqlExpr" : "\"listagg(c1, , )\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 24, +"fragment" : "listagg(c1, ', ')" + } ] +} + + +-- !query +SELECT listagg(b, a) FROM df GROUP BY a +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT", + "sqlState" : "42K09", + "messageParameters" : { +"inputExpr" : "\"a\"", +"inputName" : "`delimiter`", +"inputType" : "\"STRING\"", +"sqlExpr" : "\"listagg(b, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIndex" : 20, +"fragment" : "listagg(b, a)" + } ] +} + + +-- !query +SELECT listagg(a) OVER (ORDER BY a) FROM df +-- !query schema +struct +-- !query output +NULL +aa +aa +aabb +aabb + + +-- !query +SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC", + "sqlState" : "42601", + "messageParameters" : { +"aggFunc" : "\"listagg(a, NULL, a)\"" + }, + "queryContext" : [ { +"objectType" : "", +"objectName" : "", +"startIndex" : 8, +"stopIn
Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]
mikhailnik-db commented on code in PR #48748: URL: https://github.com/apache/spark/pull/48748#discussion_r1843670210 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala: ## @@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate import org.apache.spark.sql.catalyst.expressions.SortOrder /** - * The trait used to set the [[SortOrder]] after inverse distribution functions parsed. + * The trait used to set the [[SortOrder]] for supporting functions. */ -trait SupportsOrderingWithinGroup { self: AggregateFunction => Review Comment: Accidentally. Returned it back -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50081][SQL] Codegen Support for `XPath*`(by Invoke & RuntimeReplaceable) [spark]
panbingkun commented on PR #48610: URL: https://github.com/apache/spark/pull/48610#issuecomment-2479038190 > @panbingkun Could you resolve conflicts, please. Updated, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
cloud-fan commented on code in PR #48847: URL: https://github.com/apache/spark/pull/48847#discussion_r1843518188 ## sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala: ## @@ -741,4 +741,21 @@ class ParametersSuite extends QueryTest with SharedSparkSession with PlanTest { Row("c1")) } } + + test("SPARK-50322: parameterized identifier in a sub-query") { +withTable("tt1") { + sql("create table tt1(c1 int)") + sql("insert into tt1 values (1)") + def query(p: String): String = { +s""" + |with v1 as ( + | select * from tt1 + | where 1 = (Select * from identifier($p)) Review Comment: or we should upper case all SQL keywords -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan [spark]
cloud-fan commented on PR #48037: URL: https://github.com/apache/spark/pull/48037#issuecomment-2478376297 hmmm there are 4 suppressed exceptions for this simple query? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50309][SQL] Add documentation for SQL pipe syntax [spark]
cloud-fan commented on code in PR #48852: URL: https://github.com/apache/spark/pull/48852#discussion_r1843508769 ## docs/sql-pipe-syntax.md: ## @@ -0,0 +1,540 @@ +--- +layout: global +title: SQL Pipe Syntax +displayTitle: SQL Pipe Syntax +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Syntax + + Overview + +Apache Spark supports SQL pipe syntax which allows composing queries from combinations of operators. + +* Any query can have zero or more pipe operators as a suffix, delineated by the pipe character `|>`. +* Each pipe operator starts with one or more SQL keywords followed by its own grammar as described + in the table below. +* Most of these operators reuse existing grammar for standard SQL clauses. +* Operators can apply in any order, any number of times. + +`FROM ` is now a supported standalone query which behaves the same as +`TABLE `. This provides a convenient starting place to begin a chained pipe SQL query, +although it is possible to add one or more pipe operators to the end of any valid Spark SQL query +with the same consistent behavior as written here. + +Please refer to the table at the end of this document for a full list of all supported operators +and their semantics. + + Example + +For example, this is query 13 from the TPC-H benchmark: + +```sql +SELECT c_count, COUNT(*) AS custdist +FROM + (SELECT c_custkey, COUNT(o_orderkey) c_count + FROM customer + LEFT OUTER JOIN orders ON c_custkey = o_custkey + AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey + ) AS c_orders +GROUP BY c_count +ORDER BY custdist DESC, c_count DESC; +``` + +To write the same logic using SQL pipe operators, we express it like this: + +```sql +FROM customer +|> LEFT OUTER JOIN orders ON c_custkey = o_custkey + AND o_comment NOT LIKE '%unusual%packages%' +|> AGGREGATE COUNT(o_orderkey) c_count + GROUP BY c_custkey +|> AGGREGATE COUNT(*) AS custdist + GROUP BY c_count +|> ORDER BY custdist DESC, c_count DESC; +``` + + Source Tables + +To start a new query using SQL pipe syntax, use the `FROM ` or `TABLE ` +clause, which creates a relation comprising all rows from the source table. Then append one or more +pipe operators to the end of this clause to perform further transformations. + + Projections + +SQL pipe syntax supports composable ways to evaluate expressions. A major advantage of these +projection features is that they support computing new expressions based on previous ones in an +incremental way. No lateral column references are needed here since each operator applies +independently on its input table, regardless of the order in which the operators appear. Each of +these computed columns then becomes visible to use with the following operator. + +`SELECT` produces a new table by evaluating the provided expressions. +It is possible to use `DISTINCT` and `*` as needed. +This works like the outermost `SELECT` in a table subquery in regular Spark SQL. + +`EXTEND` adds new columns to the input table by evaluating the provided expressions. +This also preserves table aliases. +This works like `SELECT *, new_column` in regular Spark SQL. + +`DROP` removes columns from the input table. +This is similar to `SELECT * EXCEPT (column)` in regular Spark SQL. + +`SET` replaces column values from the input table. +This is similar to `SELECT * REPLACE (expression AS column)` in regular Spark SQL. + +`AS` forwards the input table and introduces a new alias for each row. + + Aggregations + +In general, aggregation takes place differently using SQL pipe syntax as opposed to regular Spark +SQL. + +To perform full-table aggregation, use the `AGGREGATE` operator with a list of aggregate +expressions to evaluate. This returns one single row in the output table. + +To perform aggregation with grouping, use the `AGGREGATE` operator with a `GROUP BY` clause. +This returns one row for each unique combination of values of the grouping expressions. The output +table contains the evaluated grouping expressions followed by the evaluated aggregate functions. +Grouping expressions support assigning aliases for purposes of referring to them in future +operators. In this way, it is not necessary to re
[PR] [SPARK-50295][INFRA] Add a script to build docs with image [spark]
panbingkun opened a new pull request, #48860: URL: https://github.com/apache/spark/pull/48860 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50226][SQL] Correct MakeDTInterval and MakeYMInterval to catch Java exceptions [spark]
gotocoding-DB commented on code in PR #48773: URL: https://github.com/apache/spark/pull/48773#discussion_r1842229690 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala: ## @@ -568,17 +572,28 @@ case class MakeYMInterval(years: Expression, months: Expression) override def dataType: DataType = YearMonthIntervalType() override def nullSafeEval(year: Any, month: Any): Any = { -Math.toIntExact(Math.addExact(month.asInstanceOf[Number].longValue(), - Math.multiplyExact(year.asInstanceOf[Number].longValue(), MONTHS_PER_YEAR))) +try { + Math.toIntExact( +Math.addExact(month.asInstanceOf[Int], + Math.multiplyExact(year.asInstanceOf[Int], MONTHS_PER_YEAR))) +} catch { Review Comment: Ok, I'll do it a bit later. UPD: https://github.com/apache/spark/pull/48848 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50049][SQL] Support custom driver metrics in writing to v2 table [spark]
cloud-fan closed pull request #48573: [SPARK-50049][SQL] Support custom driver metrics in writing to v2 table URL: https://github.com/apache/spark/pull/48573 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50237][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_2138-9`: `CIRCULAR_CLASS_REFERENCE` [spark]
MaxGekk commented on PR #48769: URL: https://github.com/apache/spark/pull/48769#issuecomment-2478475873 +1, LGTM. Merging to master. Thank you, @itholic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50237][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_2138-9`: `CIRCULAR_CLASS_REFERENCE` [spark]
MaxGekk closed pull request #48769: [SPARK-50237][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_2138-9`: `CIRCULAR_CLASS_REFERENCE` URL: https://github.com/apache/spark/pull/48769 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50091][SQL] Handle case of aggregates in left-hand operand of IN-subquery [spark]
bersprockets commented on PR #48627: URL: https://github.com/apache/spark/pull/48627#issuecomment-2479232015 cc @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]
MaxGekk commented on PR #48847: URL: https://github.com/apache/spark/pull/48847#issuecomment-2479259749 We don't need to backport this fix to `branch-3.5` because it doesn't have the changes https://github.com/apache/spark/pull/47180. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]
mrk-andreev commented on code in PR #48501: URL: https://github.com/apache/spark/pull/48501#discussion_r1844407932 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,88 @@ -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -- -UTF8_BINARY 1349 1349 0 0.1 13485.4 1.0X -UTF8_LCASE 3559 3561 3 0.0 35594.3 2.6X -UNICODE 17580 17589 12 0.0 175803.6 13.0X -UNICODE_CI 17210 17212 2 0.0 172100.2 12.8X +UTF8_BINARY 2220 2223 5 0.0 22197.0 1.0X +UTF8_LCASE 4949 4950 2 0.0 49488.1 2.2X +UNICODE 28172 28198 36 0.0 281721.0 12.7X +UNICODE_CI 28233 28308 106 0.0 282328.2 12.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time --- -UTF8_BINARY 1740 1741 1 0.1 17398.8 1.0X -UTF8_LCASE2630 2632 3 0.0 26301.0 1.5X -UNICODE 16732 16743 16 0.0 167319.7 9.6X -UNICODE_CI 16482 16492 14 0.0 164819.7 9.5X +UTF8_BINARY 2731 2733 2 0.0 27313.6 1.0X +UTF8_LCASE4611 4619 11 0.0 46111.4 1.7X +UNICODE 28149 28211 88 0.0 281486.8 10.3X +UNICODE_CI 27535 27597 89 0.0 275348.4 10.1X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -UTF8_BINARY2808 2808 0 0.0 28082.3 1.0X -UTF8_LCASE 5412 5413 1 0.0 54123.5 1.9X -UNICODE 70755 70787 44 0.0 707553.4 25.2X -UNICODE_CI57639 57669 43 0.0 576390.0 20.5X +UTF8_BINARY4603 4618 22 0.0 46031.3 1.0X +UTF8_LCASE 9510 9518 11 0.0 95097.7 2.1X +UNICODE 135718 135786 97 0.0 1357176.2 29.5X +UNICODE_CI 113715 113819 148 0.0 1137145.8 24.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time
Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]
ericm-db commented on code in PR #48401: URL: https://github.com/apache/spark/pull/48401#discussion_r1844412927 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -115,7 +119,7 @@ case class TransformWithStateExec( * Fetching the columnFamilySchemas from the StatefulProcessorHandle * after init is called. */ - private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { + def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas closeProcessorHandle() columnFamilySchemas Review Comment: Actually, I don't think you can make it static - we need the the `statefulProcessor` that is passed into this particular instance of the `TransformWithStateExec` class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50236][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_1156`: `COLUMN_NOT_DEFINED_IN_TABLE ` [spark]
MaxGekk closed pull request #48768: [SPARK-50236][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_1156`: `COLUMN_NOT_DEFINED_IN_TABLE ` URL: https://github.com/apache/spark/pull/48768 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50236][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_1156`: `COLUMN_NOT_DEFINED_IN_TABLE ` [spark]
MaxGekk commented on PR #48768: URL: https://github.com/apache/spark/pull/48768#issuecomment-2479344627 +1, LGTM. Merging to master. Thank you, @itholic. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy [spark]
ueshin commented on code in PR #48829: URL: https://github.com/apache/spark/pull/48829#discussion_r1844320464 ## sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala: ## @@ -95,13 +95,12 @@ private[sql] object Dataset { def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = sparkSession.withActive { val qe = sparkSession.sessionState.executePlan(logicalPlan) - val encoder = if (qe.isLazyAnalysis) { -RowEncoder.encoderFor(new StructType()) + if (qe.isLazyAnalysis) { Review Comment: The lazy encoder creation only happens when `qe.isLazyAnalysis`; otherwise create it right here as same as before to avoid changing the behavior. `Dataset` has a new default constructor to take a function for the lazy case, and the original default constructor is still there to keep the behavior. The suggested cleanup doesn't keep the original behavior for non-lazy case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]
brkyvz commented on code in PR #48401: URL: https://github.com/apache/spark/pull/48401#discussion_r1844340161 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -2204,6 +2204,16 @@ object SQLConf { .intConf .createWithDefault(3) + val STREAMING_STATE_STORE_ENCODING_FORMAT = +buildConf("spark.sql.streaming.stateStore.encodingFormat") + .doc("The encoding format used for stateful operators to store information " + +"in the state store") + .version("4.0.0") + .stringConf + .checkValue(v => Set("UnsafeRow", "Avro").contains(v), +"Valid values are 'UnsafeRow' and 'Avro'") Review Comment: nit: do we want to be case insensitive here? ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -115,7 +119,7 @@ case class TransformWithStateExec( * Fetching the columnFamilySchemas from the StatefulProcessorHandle * after init is called. */ - private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { + def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas closeProcessorHandle() columnFamilySchemas Review Comment: should this be moved to a static method? ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ## @@ -552,9 +570,9 @@ class IncrementalExecution( // The rule below doesn't change the plan but can cause the side effect that // metadata/schema is written in the checkpoint directory of stateful operator. Review Comment: existing: oof, this is not great... ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ## @@ -552,9 +570,9 @@ class IncrementalExecution( // The rule below doesn't change the plan but can cause the side effect that // metadata/schema is written in the checkpoint directory of stateful operator. planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule - - simulateWatermarkPropagation(planWithStateOpId) - planWithStateOpId transform WatermarkPropagationRule.rule + val planWithStateSchemas = planWithStateOpId transform StateStoreColumnFamilySchemasRule.rule + simulateWatermarkPropagation(planWithStateSchemas) + planWithStateSchemas transform WatermarkPropagationRule.rule Review Comment: ditto ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ## @@ -259,6 +259,24 @@ class IncrementalExecution( } } + /** + * This rule populates the column family schemas for the TransformWithStateExec + * operator to ship them from the driver, where the schema and serializer objects + * are created, to the executor. + */ + object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule { +override val rule: PartialFunction[SparkPlan, SparkPlan] = { + case statefulOp: StatefulOperator => +statefulOp match { + case op: TransformWithStateExec => +op.copy( + columnFamilySchemas = op.getColFamilySchemas() +) Review Comment: this is a bit confusing imho. If you have the `getColFamilySchemas` method as part of the class available, why do you have to set it on the class with a copy. Two possible suggestions: 1. Make the `getColFamilySchemas` a static method. Not sure if that's possible though looking at the logic a bit more in TransformWithStateExec. It feels weird that you're opening and closing these handles just to get some of the information out. 2. Add a comment here that this needs to be run on the Driver, and also instead rename the method to: `withColumnFamilySchemas` which calls copy internally. ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala: ## @@ -71,6 +224,33 @@ object StateStoreColumnFamilySchemaUtils { stateName, keySchema, valSchema, - Some(PrefixKeyScanStateEncoderSpec(keySchema, 1))) + Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)), + avroEnc = getAvroSerde( +StructType(keySchema.take(1)), +valSchema, +Some(StructType(keySchema.drop(1))) + )) + } + + // This function creates the StateStoreColFamilySchema for + // Timers' secondary index. + // Because we want to encode fixed-length types as binary types + // if we are using Avro, we need to do some schema conversion to ensure + // we can use range scan + def getTimerStateSchemaForSecIndex( + stateName: String, + keySchema: StructType, + valSchema: StructType): StateStoreColFamilySchema = { +val avroKeySchema = StateStoreColumnFamilySchemaUtils. + convertForRangeScan(keySchema, Seq(0)) +
[PR] [DRAFT] Two string types [spark]
stefankandic opened a new pull request, #48861: URL: https://github.com/apache/spark/pull/48861 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]
mrk-andreev commented on code in PR #48501: URL: https://github.com/apache/spark/pull/48501#discussion_r1844404602 ## sql/core/benchmarks/CollationBenchmark-jdk21-results.txt: ## @@ -1,54 +1,88 @@ -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - equalsFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -- -UTF8_BINARY 1353 1357 5 0.1 13532.2 1.0X -UTF8_LCASE 2601 2602 2 0.0 26008.0 1.9X -UNICODE 16745 16756 16 0.0 167450.9 12.4X -UNICODE_CI 16590 16627 52 0.0 165904.8 12.3X +UTF8_BINARY 2220 2223 5 0.0 22197.0 1.0X +UTF8_LCASE 4949 4950 2 0.0 49488.1 2.2X +UNICODE 28172 28198 36 0.0 281721.0 12.7X +UNICODE_CI 28233 28308 106 0.0 282328.2 12.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - compareFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time --- -UTF8_BINARY 1746 1746 0 0.1 17462.6 1.0X -UTF8_LCASE2629 2630 1 0.0 26294.8 1.5X -UNICODE 16744 16744 0 0.0 167438.6 9.6X -UNICODE_CI 16518 16521 4 0.0 165180.2 9.5X +UTF8_BINARY 2731 2733 2 0.0 27313.6 1.0X +UTF8_LCASE4611 4619 11 0.0 46111.4 1.7X +UNICODE 28149 28211 88 0.0 281486.8 10.3X +UNICODE_CI 27535 27597 89 0.0 275348.4 10.1X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - hashFunction: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time -UTF8_BINARY2808 2808 1 0.0 28076.2 1.0X -UTF8_LCASE 5409 5410 0 0.0 54093.0 1.9X -UNICODE 67930 67957 38 0.0 679296.7 24.2X -UNICODE_CI56004 56005 1 0.0 560044.2 19.9X +UTF8_BINARY4603 4618 22 0.0 46031.3 1.0X +UTF8_LCASE 9510 9518 11 0.0 95097.7 2.1X +UNICODE 135718 135786 97 0.0 1357176.2 29.5X +UNICODE_CI 113715 113819 148 0.0 1137145.8 24.7X -OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure -AMD EPYC 7763 64-Core Processor +OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws +Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz collation unit benchmarks - contains: Best Time(ms) Avg Time(ms) Stdev(ms)Rate(M/s) Per Row(ns) Relative time
[PR] [SPARK-50301][SS] Make TransformWithState metrics reflect their intuitive meanings [spark]
neilramaswamy opened a new pull request, #48862: URL: https://github.com/apache/spark/pull/48862 ### What changes were proposed in this pull request? These changes make the following changes to metrics in TWS: - `allUpdatesTimeMs` now captures the time it takes to process all the new data with the user's stateful processor. - `timerProcessingTimeMs` was added to capture the time it takes to process all the user's timers. - `allRemovalsTimeMs` now captures the time it takes to do TTL cleanup at the end of a micro-batch. - `commitTimeMs` now captures _only_ the time it takes to commit the state, not the TTL cleanup. With these metrics, a user can have a fairly clear picture of where time is being spent in a micro-batch that uses TWS: ### Why are the changes needed? The metrics today misrepresent what they're actually measuring. ### Does this PR introduce _any_ user-facing change? Yes. Metrics for TWS are changing. However, since TWS is `private[sql]`, this shouldn't impact any real users. ### How was this patch tested? We don't have any way to test these metrics in _any_ stateful operator for streaming today. ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]
Tom-Newton commented on PR #43760: URL: https://github.com/apache/spark/pull/43760#issuecomment-2479390482 > The fix addresses the issue by disabling coalescing in InMemoryTableScan for shuffles in the final stage. This PR seems to indicate that there will be a correctness bug if we allow coalescing in the final stage. Is this still true after https://github.com/apache/spark/pull/43435 which seems like its fixing the same issue? I have not noticed any correctness issues in this area but I have got a usecase where this PR causes a major performance regression. I notice that https://github.com/apache/spark/pull/45054 brings back the option to enable coalescing in InMemoryTableScan for shuffles in the final stage. If I do this my performance problem is resolved but will I be at risk of the correctness bug again? Details on my performance regression case If you configure `spark.sql.shuffle.partitions` or `spark.sql.adaptive.coalescePartitions.initialPartitionNum` to a large number, we currently use 8192. The following code ends up using 8192 partitions and is really slow as a result. ```python df = spark.createDataFrame( [ { "group": "group0", "df1_column": 1, }, { "group": "group0", "df1_column": 1, }, ] ) df = df.groupBy("group").agg(sf.max("df1_column")) df.cache() df.explain() df.show() ``` With this PR: 40 seconds Without this PR: 2 seconds Maybe its our mistake using `spark.sql.adaptive.coalescePartitions.initialPartitionNum: 8192` but I don't really see any generic way to configure this appropriately. My strategy has just been make it bigger than we would ever need and rely on AQE to coalesce to something sensible, but my understanding could be lacking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]
ericm-db commented on code in PR #48401: URL: https://github.com/apache/spark/pull/48401#discussion_r1844389128 ## connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala: ## @@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumReader import org.apache.avro.io.{BinaryDecoder, DecoderFactory} import org.apache.spark.SparkException +import org.apache.spark.sql.avro.SchemaConverters Review Comment: No, because we changed the directory of the file, we had to add imports. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]
ericm-db commented on code in PR #48401: URL: https://github.com/apache/spark/pull/48401#discussion_r1844390849 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala: ## @@ -115,7 +119,7 @@ case class TransformWithStateExec( * Fetching the columnFamilySchemas from the StatefulProcessorHandle * after init is called. */ - private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { + def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = { val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas closeProcessorHandle() columnFamilySchemas Review Comment: Good question - I guess if we can pass the stateful processor in, it can be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]
ericm-db commented on code in PR #48401: URL: https://github.com/apache/spark/pull/48401#discussion_r1844389823 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala: ## @@ -259,6 +259,24 @@ class IncrementalExecution( } } + /** + * This rule populates the column family schemas for the TransformWithStateExec + * operator to ship them from the driver, where the schema and serializer objects + * are created, to the executor. + */ + object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule { +override val rule: PartialFunction[SparkPlan, SparkPlan] = { + case statefulOp: StatefulOperator => +statefulOp match { + case op: TransformWithStateExec => Review Comment: Yeah, it will be. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50309][DOCS] Document `SQL Pipe` Syntax [spark]
dtenedor commented on code in PR #48852: URL: https://github.com/apache/spark/pull/48852#discussion_r1844576182 ## docs/sql-pipe-syntax.md: ## @@ -0,0 +1,540 @@ +--- +layout: global +title: SQL Pipe Syntax +displayTitle: SQL Pipe Syntax +license: | + Licensed to the Apache Software Foundation (ASF) under one or more + contributor license agreements. See the NOTICE file distributed with + this work for additional information regarding copyright ownership. + The ASF licenses this file to You under the Apache License, Version 2.0 + (the "License"); you may not use this file except in compliance with + the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +--- + +### Syntax + + Overview + +Apache Spark supports SQL pipe syntax which allows composing queries from combinations of operators. + +* Any query can have zero or more pipe operators as a suffix, delineated by the pipe character `|>`. +* Each pipe operator starts with one or more SQL keywords followed by its own grammar as described + in the table below. +* Most of these operators reuse existing grammar for standard SQL clauses. +* Operators can apply in any order, any number of times. + +`FROM ` is now a supported standalone query which behaves the same as +`TABLE `. This provides a convenient starting place to begin a chained pipe SQL query, +although it is possible to add one or more pipe operators to the end of any valid Spark SQL query +with the same consistent behavior as written here. + +Please refer to the table at the end of this document for a full list of all supported operators +and their semantics. + + Example + +For example, this is query 13 from the TPC-H benchmark: + +```sql +SELECT c_count, COUNT(*) AS custdist +FROM + (SELECT c_custkey, COUNT(o_orderkey) c_count + FROM customer + LEFT OUTER JOIN orders ON c_custkey = o_custkey + AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey + ) AS c_orders +GROUP BY c_count +ORDER BY custdist DESC, c_count DESC; +``` + +To write the same logic using SQL pipe operators, we express it like this: + +```sql +FROM customer +|> LEFT OUTER JOIN orders ON c_custkey = o_custkey + AND o_comment NOT LIKE '%unusual%packages%' +|> AGGREGATE COUNT(o_orderkey) c_count + GROUP BY c_custkey +|> AGGREGATE COUNT(*) AS custdist + GROUP BY c_count +|> ORDER BY custdist DESC, c_count DESC; +``` + + Source Tables + +To start a new query using SQL pipe syntax, use the `FROM ` or `TABLE ` +clause, which creates a relation comprising all rows from the source table. Then append one or more +pipe operators to the end of this clause to perform further transformations. + + Projections + +SQL pipe syntax supports composable ways to evaluate expressions. A major advantage of these +projection features is that they support computing new expressions based on previous ones in an +incremental way. No lateral column references are needed here since each operator applies +independently on its input table, regardless of the order in which the operators appear. Each of +these computed columns then becomes visible to use with the following operator. + +`SELECT` produces a new table by evaluating the provided expressions. +It is possible to use `DISTINCT` and `*` as needed. +This works like the outermost `SELECT` in a table subquery in regular Spark SQL. + +`EXTEND` adds new columns to the input table by evaluating the provided expressions. +This also preserves table aliases. +This works like `SELECT *, new_column` in regular Spark SQL. + +`DROP` removes columns from the input table. +This is similar to `SELECT * EXCEPT (column)` in regular Spark SQL. + +`SET` replaces column values from the input table. +This is similar to `SELECT * REPLACE (expression AS column)` in regular Spark SQL. + +`AS` forwards the input table and introduces a new alias for each row. + + Aggregations + +In general, aggregation takes place differently using SQL pipe syntax as opposed to regular Spark +SQL. + +To perform full-table aggregation, use the `AGGREGATE` operator with a list of aggregate +expressions to evaluate. This returns one single row in the output table. + +To perform aggregation with grouping, use the `AGGREGATE` operator with a `GROUP BY` clause. +This returns one row for each unique combination of values of the grouping expressions. The output +table contains the evaluated grouping expressions followed by the evaluated aggregate functions. +Grouping expressions support assigning aliases for purposes of referring to them in future +operators. In this way, it is not necessary to rep
Re: [PR] [SPARK-49787][SQL] Cast between UDT and other types [spark]
cloud-fan commented on PR #48251: URL: https://github.com/apache/spark/pull/48251#issuecomment-2480296031 I don't think ANSI SQL allows casting between UDT and builtin types. Shall we use the `UnwrapUDT` to unwrap expressions that return UDT, in the store assignment rule `ResolveOutputRelation`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-49787][SQL] Cast between UDT and other types [spark]
viirya commented on PR #48251: URL: https://github.com/apache/spark/pull/48251#issuecomment-2480395470 > I don't think ANSI SQL allows casting between UDT and builtin types. Shall we use the `UnwrapUDT` expression to wrap expressions that return UDT, in the store assignment rule `ResolveOutputRelation`? Sounds good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org