[PR] [SPARK-50325][SQL] Factor out alias resolution to be reused in the single-pass Analyzer [spark]

2024-11-15 Thread via GitHub


vladimirg-db opened a new pull request, #48857:
URL: https://github.com/apache/spark/pull/48857

   ### What changes were proposed in this pull request?
   
   Factor out alias resolution code to the `AliasResolution` object.
   
   ### Why are the changes needed?
   
   Some Analyzer code will be used in both fixed-point and single-pass 
Analyzers. Also, Analyzer.scala is 4K+ lines long, so it makes sense to 
gradually split it.
   
   Context: https://issues.apache.org/jira/browse/SPARK-49834
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. It's a pure refactoring.
   
   ### How was this patch tested?
   
   Existing tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50313][SQL][TESTS] Enable ANSI in SQL *SQLQueryTestSuite by default [spark]

2024-11-15 Thread via GitHub


yaooqinn commented on code in PR #48842:
URL: https://github.com/apache/spark/pull/48842#discussion_r1843367043


##
sql/core/src/test/resources/sql-tests/inputs/decimalArithmeticOperations.sql:
##
@@ -88,7 +88,7 @@ SELECT CAST(10 AS DECIMAL(10, 2)) div CAST(3 AS DECIMAL(5, 
1));
 set spark.sql.decimalOperations.allowPrecisionLoss=false;
 
 -- test decimal operations
-select id, a+b, a-b, a*b, a/b from decimals_test order by id;
+select /*+ COALESCE(1) */ id, a+b, a-b, a*b, a/b from decimals_test order by 
id;

Review Comment:
   stabilize error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50313][SQL][TESTS] Enable ANSI in SQL *SQLQueryTestSuite by default [spark]

2024-11-15 Thread via GitHub


yaooqinn commented on code in PR #48842:
URL: https://github.com/apache/spark/pull/48842#discussion_r1843366680


##
sql/core/src/test/resources/sql-tests/inputs/udf/udf-union.sql:
##
@@ -11,7 +11,7 @@ FROM   (SELECT udf(c1) as c1, udf(c2) as c2 FROM t1
 
 -- Type Coerced Union
 SELECT udf(c1) as c1, udf(c2) as c2
-FROM   (SELECT udf(c1) as c1, udf(c2) as c2 FROM t1
+FROM   (SELECT udf(c1) as c1, udf(c2) as c2 FROM t1 WHERE c2 = 'a'

Review Comment:
   stabilize error



##
sql/core/src/test/resources/sql-tests/inputs/pipe-operators.sql:
##
@@ -548,7 +548,8 @@ values (0, 'abc') tab(x, y)
 
 -- Union distinct with a VALUES list.
 values (0, 1) tab(x, y)
-|> union table t;
+|> union table t
+|> where x = 0;

Review Comment:
   stabilize error



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]

2024-11-15 Thread via GitHub


vladimirg-db opened a new pull request, #48858:
URL: https://github.com/apache/spark/pull/48858

   ### What changes were proposed in this pull request?
   
   Factor out function resolution code to the `FunctionResolution` object.
   
   ### Why are the changes needed?
   
   Some Analyzer code will be used in both fixed-point and single-pass 
Analyzers. Also, Analyzer.scala is 4K+ lines long, so it makes sense to 
gradually split it.
   
   Context: https://issues.apache.org/jira/browse/SPARK-49834
   
   ### Does this PR introduce _any_ user-facing change?
   
   No. It's a pure refactoring.
   
   ### How was this patch tested?
   
   Existing tests.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50309][SQL] Add documentation for SQL pipe syntax [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48852:
URL: https://github.com/apache/spark/pull/48852#discussion_r1843507956


##
docs/sql-pipe-syntax.md:
##
@@ -0,0 +1,540 @@
+---
+layout: global
+title: SQL Pipe Syntax
+displayTitle: SQL Pipe Syntax
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+### Syntax
+
+ Overview
+
+Apache Spark supports SQL pipe syntax which allows composing queries from 
combinations of operators.
+
+* Any query can have zero or more pipe operators as a suffix, delineated by 
the pipe character `|>`. 
+* Each pipe operator starts with one or more SQL keywords followed by its own 
grammar as described
+  in the table below.
+* Most of these operators reuse existing grammar for standard SQL clauses.
+* Operators can apply in any order, any number of times.
+
+`FROM ` is now a supported standalone query which behaves the same 
as
+`TABLE `. This provides a convenient starting place to begin a 
chained pipe SQL query,
+although it is possible to add one or more pipe operators to the end of any 
valid Spark SQL query
+with the same consistent behavior as written here.
+
+Please refer to the table at the end of this document for a full list of all 
supported operators
+and their semantics.
+
+ Example
+
+For example, this is query 13 from the TPC-H benchmark:
+
+```sql
+SELECT c_count, COUNT(*) AS custdist
+FROM
+  (SELECT c_custkey, COUNT(o_orderkey) c_count
+   FROM customer
+   LEFT OUTER JOIN orders ON c_custkey = o_custkey
+ AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey
+  ) AS c_orders
+GROUP BY c_count
+ORDER BY custdist DESC, c_count DESC;
+```
+
+To write the same logic using SQL pipe operators, we express it like this:
+
+```sql
+FROM customer
+|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
+   AND o_comment NOT LIKE '%unusual%packages%'
+|> AGGREGATE COUNT(o_orderkey) c_count
+   GROUP BY c_custkey
+|> AGGREGATE COUNT(*) AS custdist
+   GROUP BY c_count
+|> ORDER BY custdist DESC, c_count DESC;
+```
+
+ Source Tables
+
+To start a new query using SQL pipe syntax, use the `FROM ` or 
`TABLE `
+clause, which creates a relation comprising all rows from the source table. 
Then append one or more
+pipe operators to the end of this clause to perform further transformations.
+
+ Projections
+
+SQL pipe syntax supports composable ways to evaluate expressions. A major 
advantage of these
+projection features is that they support computing new expressions based on 
previous ones in an
+incremental way. No lateral column references are needed here since each 
operator applies
+independently on its input table, regardless of the order in which the 
operators appear. Each of
+these computed columns then becomes visible to use with the following operator.
+
+`SELECT` produces a new table by evaluating the provided expressions.
+It is possible to use `DISTINCT` and `*` as needed.
+This works like the outermost `SELECT` in a table subquery in regular Spark 
SQL.
+
+`EXTEND` adds new columns to the input table by evaluating the provided 
expressions.
+This also preserves table aliases.
+This works like `SELECT *, new_column` in regular Spark SQL.
+
+`DROP` removes columns from the input table.
+This is similar to `SELECT * EXCEPT (column)` in regular Spark SQL.
+
+`SET` replaces column values from the input table.
+This is similar to `SELECT * REPLACE (expression AS column)` in regular Spark 
SQL.
+
+`AS` forwards the input table and introduces a new alias for each row.
+
+ Aggregations
+
+In general, aggregation takes place differently using SQL pipe syntax as 
opposed to regular Spark
+SQL.
+
+To perform full-table aggregation, use the `AGGREGATE` operator with a list of 
aggregate
+expressions to evaluate. This returns one single row in the output table.
+
+To perform aggregation with grouping, use the `AGGREGATE` operator with a 
`GROUP BY` clause.
+This returns one row for each unique combination of values of the grouping 
expressions. The output
+table contains the evaluated grouping expressions followed by the evaluated 
aggregate functions.
+Grouping expressions support assigning aliases for purposes of referring to 
them in future
+operators. In this way, it is not necessary to re

Re: [PR] [SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan [spark]

2024-11-15 Thread via GitHub


yaooqinn commented on PR #48037:
URL: https://github.com/apache/spark/pull/48037#issuecomment-2478403269

   It looks like the number of suppressed can be calculated by `2 * calls of 
Utils.doTryWithCallerStacktrace`


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50313][SQL][TESTS] Enable ANSI in SQL *SQLQueryTestSuite by default [spark]

2024-11-15 Thread via GitHub


yaooqinn commented on PR #48842:
URL: https://github.com/apache/spark/pull/48842#issuecomment-2478422326

   cc @dongjoon-hyun as the initiator of SPARK-4, also cc @cloud-fan, thank 
you


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48847:
URL: https://github.com/apache/spark/pull/48847#discussion_r1843517881


##
sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala:
##
@@ -741,4 +741,21 @@ class ParametersSuite extends QueryTest with 
SharedSparkSession with PlanTest {
 Row("c1"))
 }
   }
+
+  test("SPARK-50322: parameterized identifier in a sub-query") {
+withTable("tt1") {
+  sql("create table tt1(c1 int)")
+  sql("insert into tt1 values (1)")
+  def query(p: String): String = {
+s"""
+  |with v1 as (
+  |  select * from tt1
+  |  where 1 = (Select * from identifier($p))

Review Comment:
   ```suggestion
 |  where 1 = (select * from identifier($p))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on code in PR #48847:
URL: https://github.com/apache/spark/pull/48847#discussion_r1843594293


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala:
##
@@ -189,7 +189,8 @@ object BindParameters extends ParameterizedQueryProcessor 
with QueryErrorsBase {
   // We should wait for `CTESubstitution` to resolve CTE before binding 
parameters, as CTE
   // relations are not children of `UnresolvedWith`.

Review Comment:
   Can we do that in 2 steps: 1. fix the issue, 2. do the proposed refactoring? 
It seems less risky, so, the first patch could be backported more safely.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843594523


##
sql/core/src/test/resources/sql-tests/results/listagg.sql.out:
##
@@ -0,0 +1,436 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct
+-- !query output
+123true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",

Review Comment:
   what's the point of this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843597510


##
sql/core/src/test/resources/sql-tests/results/listagg.sql.out:
##
@@ -0,0 +1,436 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct
+-- !query output
+123true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"error" : "','",
+"hint" : ""
+  }
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"dataType" : "(\"BINARY\" or \"STRING\")",
+"functionName" : "`listagg`",
+"sqlExpr" : "\"listagg(c1, , )\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 24,
+"fragment" : "listagg(c1, ', ')"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"inputExpr" : "\"a\"",
+"inputName" : "`delimiter`",
+"inputType" : "\"STRING\"",
+"sqlExpr" : "\"listagg(b, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 20,
+"fragment" : "listagg(b, a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+aa
+aa
+aabb
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"aggFunc" : "\"listagg(a, NULL, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex"

Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843595865


##
sql/core/src/test/resources/sql-tests/results/listagg.sql.out:
##
@@ -0,0 +1,436 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct
+-- !query output
+123true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"error" : "','",
+"hint" : ""
+  }
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"dataType" : "(\"BINARY\" or \"STRING\")",
+"functionName" : "`listagg`",
+"sqlExpr" : "\"listagg(c1, , )\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 24,
+"fragment" : "listagg(c1, ', ')"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"inputExpr" : "\"a\"",
+"inputName" : "`delimiter`",
+"inputType" : "\"STRING\"",
+"sqlExpr" : "\"listagg(b, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 20,
+"fragment" : "listagg(b, a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+aa
+aa
+aabb
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"aggFunc" : "\"listagg(a, NULL, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex"

Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]

2024-11-15 Thread via GitHub


stevomitric commented on code in PR #48501:
URL: https://github.com/apache/spark/pull/48501#discussion_r1843598536


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,88 @@
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
--
-UTF8_BINARY  1353   1357   
5  0.1   13532.2   1.0X
-UTF8_LCASE   2601   2602   
2  0.0   26008.0   1.9X
-UNICODE 16745  16756   
   16  0.0  167450.9  12.4X
-UNICODE_CI  16590  16627   
   52  0.0  165904.8  12.3X
+UTF8_BINARY  2220   2223   
5  0.0   22197.0   1.0X
+UTF8_LCASE   4949   4950   
2  0.0   49488.1   2.2X
+UNICODE 28172  28198   
   36  0.0  281721.0  12.7X
+UNICODE_CI  28233  28308   
  106  0.0  282328.2  12.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
---
-UTF8_BINARY   1746   1746  
 0  0.1   17462.6   1.0X
-UTF8_LCASE2629   2630  
 1  0.0   26294.8   1.5X
-UNICODE  16744  16744  
 0  0.0  167438.6   9.6X
-UNICODE_CI   16518  16521  
 4  0.0  165180.2   9.5X
+UTF8_BINARY   2731   2733  
 2  0.0   27313.6   1.0X
+UTF8_LCASE4611   4619  
11  0.0   46111.4   1.7X
+UNICODE  28149  28211  
88  0.0  281486.8  10.3X
+UNICODE_CI   27535  27597  
89  0.0  275348.4  10.1X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

-UTF8_BINARY2808   2808 
  1  0.0   28076.2   1.0X
-UTF8_LCASE 5409   5410 
  0  0.0   54093.0   1.9X
-UNICODE   67930  67957 
 38  0.0  679296.7  24.2X
-UNICODE_CI56004  56005 
  1  0.0  560044.2  19.9X
+UTF8_BINARY4603   4618 
 22  0.0   46031.3   1.0X
+UTF8_LCASE 9510   9518 
 11  0.0   95097.7   2.1X
+UNICODE  135718 135786 
 97  0.0 1357176.2  29.5X
+UNICODE_CI   113715 113819 
148  0.0 1137145.8  24.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]

2024-11-15 Thread via GitHub


stevomitric commented on code in PR #48501:
URL: https://github.com/apache/spark/pull/48501#discussion_r1843598536


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,88 @@
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
--
-UTF8_BINARY  1353   1357   
5  0.1   13532.2   1.0X
-UTF8_LCASE   2601   2602   
2  0.0   26008.0   1.9X
-UNICODE 16745  16756   
   16  0.0  167450.9  12.4X
-UNICODE_CI  16590  16627   
   52  0.0  165904.8  12.3X
+UTF8_BINARY  2220   2223   
5  0.0   22197.0   1.0X
+UTF8_LCASE   4949   4950   
2  0.0   49488.1   2.2X
+UNICODE 28172  28198   
   36  0.0  281721.0  12.7X
+UNICODE_CI  28233  28308   
  106  0.0  282328.2  12.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
---
-UTF8_BINARY   1746   1746  
 0  0.1   17462.6   1.0X
-UTF8_LCASE2629   2630  
 1  0.0   26294.8   1.5X
-UNICODE  16744  16744  
 0  0.0  167438.6   9.6X
-UNICODE_CI   16518  16521  
 4  0.0  165180.2   9.5X
+UTF8_BINARY   2731   2733  
 2  0.0   27313.6   1.0X
+UTF8_LCASE4611   4619  
11  0.0   46111.4   1.7X
+UNICODE  28149  28211  
88  0.0  281486.8  10.3X
+UNICODE_CI   27535  27597  
89  0.0  275348.4  10.1X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

-UTF8_BINARY2808   2808 
  1  0.0   28076.2   1.0X
-UTF8_LCASE 5409   5410 
  0  0.0   54093.0   1.9X
-UNICODE   67930  67957 
 38  0.0  679296.7  24.2X
-UNICODE_CI56004  56005 
  1  0.0  560044.2  19.9X
+UTF8_BINARY4603   4618 
 22  0.0   46031.3   1.0X
+UTF8_LCASE 9510   9518 
 11  0.0   95097.7   2.1X
+UNICODE  135718 135786 
 97  0.0 1357176.2  29.5X
+UNICODE_CI   113715 113819 
148  0.0 1137145.8  24.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]

2024-11-15 Thread via GitHub


stevomitric commented on code in PR #48501:
URL: https://github.com/apache/spark/pull/48501#discussion_r1843598536


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,88 @@
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
--
-UTF8_BINARY  1353   1357   
5  0.1   13532.2   1.0X
-UTF8_LCASE   2601   2602   
2  0.0   26008.0   1.9X
-UNICODE 16745  16756   
   16  0.0  167450.9  12.4X
-UNICODE_CI  16590  16627   
   52  0.0  165904.8  12.3X
+UTF8_BINARY  2220   2223   
5  0.0   22197.0   1.0X
+UTF8_LCASE   4949   4950   
2  0.0   49488.1   2.2X
+UNICODE 28172  28198   
   36  0.0  281721.0  12.7X
+UNICODE_CI  28233  28308   
  106  0.0  282328.2  12.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
---
-UTF8_BINARY   1746   1746  
 0  0.1   17462.6   1.0X
-UTF8_LCASE2629   2630  
 1  0.0   26294.8   1.5X
-UNICODE  16744  16744  
 0  0.0  167438.6   9.6X
-UNICODE_CI   16518  16521  
 4  0.0  165180.2   9.5X
+UTF8_BINARY   2731   2733  
 2  0.0   27313.6   1.0X
+UTF8_LCASE4611   4619  
11  0.0   46111.4   1.7X
+UNICODE  28149  28211  
88  0.0  281486.8  10.3X
+UNICODE_CI   27535  27597  
89  0.0  275348.4  10.1X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

-UTF8_BINARY2808   2808 
  1  0.0   28076.2   1.0X
-UTF8_LCASE 5409   5410 
  0  0.0   54093.0   1.9X
-UNICODE   67930  67957 
 38  0.0  679296.7  24.2X
-UNICODE_CI56004  56005 
  1  0.0  560044.2  19.9X
+UTF8_BINARY4603   4618 
 22  0.0   46031.3   1.0X
+UTF8_LCASE 9510   9518 
 11  0.0   95097.7   2.1X
+UNICODE  135718 135786 
 97  0.0 1357176.2  29.5X
+UNICODE_CI   113715 113819 
148  0.0 1137145.8  24.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

[PR] [SPARK-50328][INFRA] Add a separate docker file for SparkR [spark]

2024-11-15 Thread via GitHub


zhengruifeng opened a new pull request, #48859:
URL: https://github.com/apache/spark/pull/48859

   ### What changes were proposed in this pull request?
   Add a separate docker file for SparkR
   
   ### Why are the changes needed?
   For env isolation
   
   
   ### Does this PR introduce _any_ user-facing change?
   No, infra-only
   
   
   ### How was this patch tested?
   CI
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48847:
URL: https://github.com/apache/spark/pull/48847#discussion_r1843600908


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala:
##
@@ -189,7 +189,8 @@ object BindParameters extends ParameterizedQueryProcessor 
with QueryErrorsBase {
   // We should wait for `CTESubstitution` to resolve CTE before binding 
parameters, as CTE
   // relations are not children of `UnresolvedWith`.

Review Comment:
   +1



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48829:
URL: https://github.com/apache/spark/pull/48829#discussion_r1843600376


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -95,13 +95,12 @@ private[sql] object Dataset {
   def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =
 sparkSession.withActive {
   val qe = sparkSession.sessionState.executePlan(logicalPlan)
-  val encoder = if (qe.isLazyAnalysis) {
-RowEncoder.encoderFor(new StructType())
+  if (qe.isLazyAnalysis) {

Review Comment:
   I don't get it. My proposal is a pure code cleanup that doesn't change any 
actual logic.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR] Fix code style for if/for/while statements [spark]

2024-11-15 Thread via GitHub


MaxGekk closed pull request #48425: [MINOR] Fix code style for if/for/while 
statements
URL: https://github.com/apache/spark/pull/48425


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843593480


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -265,3 +271,257 @@ private[aggregate] object CollectTopK {
 case _ => throw QueryCompilationErrors.invalidNumParameter(e)
   }
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+_FUNC_(expr[, delimiter])[ WITHIN GROUP (ORDER BY key [ASC | DESC] 
[,...])] - Returns
+the concatenation of non-null input values, separated by the delimiter 
ordered by key.
+If all values are null, null is returned.
+""",
+  arguments = """
+Arguments:
+  * expr - a string or binary expression to be concatenated.
+  * delimiter - an optional string or binary foldable expression used to 
separate the input values.
+If null, the concatenation will be performed without a delimiter. 
Default is null.
+  * key - an optional expression for ordering the input values. Multiple 
keys can be specified.
+If none are specified, the order of the rows in the result is 
non-deterministic.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   abc
+  > SELECT _FUNC_(col) WITHIN GROUP (ORDER BY col DESC) FROM VALUES ('a'), 
('b'), ('c') AS tab(col);
+   cba
+  > SELECT _FUNC_(col) FROM VALUES ('a'), (NULL), ('b') AS tab(col);
+   ab
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   aa
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   ab
+  > SELECT _FUNC_(col, ', ') FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a, b, c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  note = """
+* If the order is not specified, the function is non-deterministic because
+the order of the rows may be non-deterministic after a shuffle.
+* If DISTINCT is specified, then expr and key must be the same expression.
+  """,
+  group = "agg_funcs",
+  since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal(null),
+orderExpressions: Seq[SortOrder] = Nil,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends Collect[mutable.ArrayBuffer[Any]]
+  with SupportsOrderingWithinGroup
+  with ImplicitCastInputTypes {
+
+  override def isOrderingMandatory: Boolean = false
+  override def isDistinctSupported: Boolean = true
+  override protected lazy val bufferElementType: DataType = {
+if (noNeedSaveOrderValue) {
+  child.dataType
+} else {
+  StructType(
+StructField("value", child.dataType)
++: orderValuesField
+  )
+}
+  }
+  /** Indicates that the result of [[child]] is enough for evaluation  */
+  private lazy val noNeedSaveOrderValue: Boolean = 
isOrderCompatible(orderExpressions)
+
+  def this(child: Expression) =
+this(child, Literal(null), Nil, 0, 0)
+
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, Nil, 0, 0)
+
+  override def nullable: Boolean = true
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): 
ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def defaultResult: Option[Literal] = Option(Literal.create(null, 
dataType))
+
+  override def sql(isDistinct: Boolean): String = {
+val distinct = if (isDistinct) "DISTINCT " else ""
+val withinGroup = if (orderingFilled) {
+  s" WITHIN GROUP (ORDER BY ${orderExpressions.map(_.sql).mkString(", ")})"
+} else {
+  ""
+}
+s"$prettyName($distinct${child.sql}, ${delimiter.sql})$withinGroup"
+  }
+
+  override def inputTypes: Seq[AbstractDataType] =
+TypeCollection(
+  StringTypeWithCollation(supportsTrimCollation = true),
+  BinaryType
+) +:
+TypeCollection(
+  StringTypeWithCollation(supportsTrimCollation = true),
+  BinaryType,
+  NullType
+) +:
+orderExpressions.map(_ => AnyDataType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val matchInputTypes = super.checkInputDataTypes()
+if (matchInputTypes.isFailure) {
+  return matchInputTypes
+}
+if (!delimiter.foldable) {
+  return DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("delimiter"),
+  "inputType" -> toSQLType(delimiter.dataType),
+  "inputExpr" -> toSQLExpr(delimiter)
+)
+  )
+}
+if (delimiter.dataType ==

Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843593193


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/collect.scala:
##
@@ -265,3 +271,257 @@ private[aggregate] object CollectTopK {
 case _ => throw QueryCompilationErrors.invalidNumParameter(e)
   }
 }
+
+// scalastyle:off line.size.limit
+@ExpressionDescription(
+  usage = """
+_FUNC_(expr[, delimiter])[ WITHIN GROUP (ORDER BY key [ASC | DESC] 
[,...])] - Returns
+the concatenation of non-null input values, separated by the delimiter 
ordered by key.
+If all values are null, null is returned.
+""",
+  arguments = """
+Arguments:
+  * expr - a string or binary expression to be concatenated.
+  * delimiter - an optional string or binary foldable expression used to 
separate the input values.
+If null, the concatenation will be performed without a delimiter. 
Default is null.
+  * key - an optional expression for ordering the input values. Multiple 
keys can be specified.
+If none are specified, the order of the rows in the result is 
non-deterministic.
+  """,
+  examples = """
+Examples:
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   abc
+  > SELECT _FUNC_(col) WITHIN GROUP (ORDER BY col DESC) FROM VALUES ('a'), 
('b'), ('c') AS tab(col);
+   cba
+  > SELECT _FUNC_(col) FROM VALUES ('a'), (NULL), ('b') AS tab(col);
+   ab
+  > SELECT _FUNC_(col) FROM VALUES ('a'), ('a') AS tab(col);
+   aa
+  > SELECT _FUNC_(DISTINCT col) FROM VALUES ('a'), ('a'), ('b') AS 
tab(col);
+   ab
+  > SELECT _FUNC_(col, ', ') FROM VALUES ('a'), ('b'), ('c') AS tab(col);
+   a, b, c
+  > SELECT _FUNC_(col) FROM VALUES (NULL), (NULL) AS tab(col);
+   NULL
+  """,
+  note = """
+* If the order is not specified, the function is non-deterministic because
+the order of the rows may be non-deterministic after a shuffle.
+* If DISTINCT is specified, then expr and key must be the same expression.
+  """,
+  group = "agg_funcs",
+  since = "4.0.0"
+)
+// scalastyle:on line.size.limit
+case class ListAgg(
+child: Expression,
+delimiter: Expression = Literal(null),
+orderExpressions: Seq[SortOrder] = Nil,
+mutableAggBufferOffset: Int = 0,
+inputAggBufferOffset: Int = 0)
+  extends Collect[mutable.ArrayBuffer[Any]]
+  with SupportsOrderingWithinGroup
+  with ImplicitCastInputTypes {
+
+  override def isOrderingMandatory: Boolean = false
+  override def isDistinctSupported: Boolean = true
+  override protected lazy val bufferElementType: DataType = {
+if (noNeedSaveOrderValue) {
+  child.dataType
+} else {
+  StructType(
+StructField("value", child.dataType)
++: orderValuesField
+  )
+}
+  }
+  /** Indicates that the result of [[child]] is enough for evaluation  */
+  private lazy val noNeedSaveOrderValue: Boolean = 
isOrderCompatible(orderExpressions)
+
+  def this(child: Expression) =
+this(child, Literal(null), Nil, 0, 0)
+
+  def this(child: Expression, delimiter: Expression) =
+this(child, delimiter, Nil, 0, 0)
+
+  override def nullable: Boolean = true
+
+  override def createAggregationBuffer(): mutable.ArrayBuffer[Any] = 
mutable.ArrayBuffer.empty
+
+  override def withNewMutableAggBufferOffset(newMutableAggBufferOffset: Int): 
ImperativeAggregate =
+copy(mutableAggBufferOffset = newMutableAggBufferOffset)
+
+  override def withNewInputAggBufferOffset(newInputAggBufferOffset: Int): 
ImperativeAggregate =
+copy(inputAggBufferOffset = newInputAggBufferOffset)
+
+  override def defaultResult: Option[Literal] = Option(Literal.create(null, 
dataType))
+
+  override def sql(isDistinct: Boolean): String = {
+val distinct = if (isDistinct) "DISTINCT " else ""
+val withinGroup = if (orderingFilled) {
+  s" WITHIN GROUP (ORDER BY ${orderExpressions.map(_.sql).mkString(", ")})"
+} else {
+  ""
+}
+s"$prettyName($distinct${child.sql}, ${delimiter.sql})$withinGroup"
+  }
+
+  override def inputTypes: Seq[AbstractDataType] =
+TypeCollection(
+  StringTypeWithCollation(supportsTrimCollation = true),
+  BinaryType
+) +:
+TypeCollection(
+  StringTypeWithCollation(supportsTrimCollation = true),
+  BinaryType,
+  NullType
+) +:
+orderExpressions.map(_ => AnyDataType)
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+val matchInputTypes = super.checkInputDataTypes()
+if (matchInputTypes.isFailure) {
+  return matchInputTypes
+}
+if (!delimiter.foldable) {
+  return DataTypeMismatch(
+errorSubClass = "NON_FOLDABLE_INPUT",
+messageParameters = Map(
+  "inputName" -> toSQLId("delimiter"),
+  "inputType" -> toSQLType(delimiter.dataType),
+  "inputExpr" -> toSQLExpr(delimiter)
+)
+  )
+}
+if (delimiter.dataType ==

Re: [PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]

2024-11-15 Thread via GitHub


MaxGekk closed pull request #48858: [SPARK-50327][SQL] Factor out function 
resolution to be reused in the single-pass Analyzer
URL: https://github.com/apache/spark/pull/48858


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on code in PR #48858:
URL: https://github.com/apache/spark/pull/48858#discussion_r1843958488


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala:
##
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.{
+  CatalogManager,
+  CatalogV2Util,
+  FunctionCatalog,
+  Identifier,
+  LookupCatalog
+}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.connector.catalog.functions.{
+  AggregateFunction => V2AggregateFunction,
+  ScalarFunction
+}
+import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors}
+import org.apache.spark.sql.types._
+
+class FunctionResolution(
+override val catalogManager: CatalogManager,
+relationResolution: RelationResolution)
+extends DataTypeErrorsBase with LookupCatalog {
+  private val v1SessionCatalog = catalogManager.v1SessionCatalog
+
+  private val trimWarningEnabled = new AtomicBoolean(true)

Review Comment:
   sgtm



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on PR #48858:
URL: https://github.com/apache/spark/pull/48858#issuecomment-2479096472

   +1, LGTM. Merging to master.
   Thank you, @vladimirg-db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50320][CORE] Make `--remote` an official option by removing `experimental` warning [spark]

2024-11-15 Thread via GitHub


dongjoon-hyun closed pull request #48850: [SPARK-50320][CORE] Make `--remote` 
an official option by removing `experimental` warning
URL: https://github.com/apache/spark/pull/48850


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50320][CORE] Make `--remote` an official option by removing `experimental` warning [spark]

2024-11-15 Thread via GitHub


dongjoon-hyun commented on PR #48850:
URL: https://github.com/apache/spark/pull/48850#issuecomment-2479114901

   Thank you, @yaooqinn ! 
   
   Merged to master for Apache Spark 4.0.0 on February 2025.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP] Subquery only Spark Connect. [spark]

2024-11-15 Thread via GitHub


ueshin closed pull request #48863: [WIP] Subquery only Spark Connect.
URL: https://github.com/apache/spark/pull/48863


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] Subquery only Spark Connect. [spark]

2024-11-15 Thread via GitHub


ueshin opened a new pull request, #48864:
URL: https://github.com/apache/spark/pull/48864

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]

2024-11-15 Thread via GitHub


ahshahid commented on code in PR #48252:
URL: https://github.com/apache/spark/pull/48252#discussion_r1844712588


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala:
##
@@ -137,8 +137,16 @@ class SparkSession private[sql] (
 
   /** @inheritdoc */
   def createDataFrame(data: java.util.List[_], beanClass: Class[_]): DataFrame 
= {
-val encoder = 
JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]])
-createDataset(encoder, data.iterator().asScala).toDF()
+JavaTypeInference.setSparkClientFlag()
+val encoderTry = Try {
+  JavaTypeInference.encoderFor(beanClass.asInstanceOf[Class[Any]])
+}
+JavaTypeInference.unsetSparkClientFlag()

Review Comment:
   Sure. The thing is that @hvanhovell  mentioned that KryoSerializer is not 
supported for spark-connect client.
   If the Class for which Encoder is being created, implements 
KryoSerializable, then preference to Kryo based Encoder is to be given over 
Java Serializable based Encoder.  But if the thread which is creating the 
encoder is emanating from connect client, then Kryo based encoder should not 
get created. Instead for such cases Java Serializable based encoder should be 
created.
   Instead of adding extra function parameter, the information is passed using 
thread local. And once the encoder is created, the thread local is unset.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]

2024-11-15 Thread via GitHub


ahshahid commented on code in PR #48252:
URL: https://github.com/apache/spark/pull/48252#discussion_r1844719865


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala:
##
@@ -148,34 +163,180 @@ object JavaTypeInference {
   // TODO: we should only collect properties that have getter and setter. 
However, some tests
   //   pass in scala case class as java bean class which doesn't have 
getter and setter.
   val properties = getJavaBeanReadableProperties(c)
-  // add type variables from inheritance hierarchy of the class
-  val classTV = JavaTypeUtils.getTypeArguments(c, 
classOf[Object]).asScala.toMap ++
-typeVariables
-  // Note that the fields are ordered by name.
-  val fields = properties.map { property =>
-val readMethod = property.getReadMethod
-val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet 
+ c, classTV)
-// The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
-val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
-EncoderField(
-  property.getName,
-  encoder,
-  encoder.nullable && !hasNonNull,
-  Metadata.empty,
-  Option(readMethod.getName),
-  Option(property.getWriteMethod).map(_.getName))
+
+  // if the properties is empty and this is not a top level enclosing 
class, then we
+  // should not consider class as bean, as otherwise it will be treated as 
empty schema
+  // and loose the data on deser.

Review Comment:
   Lets say the top level class for which encoder is being created, has a field 
x which is a POJO, but has no Bean type getters.
   This means field x  corresponding schema is empty.  So when the DataSet 
corresponding to top level class is converted to a dataframe, there is no 
representation of x, in the Row object.
   So when this data frame is converted back to DataSet, the field x : POJO 
will be set to null and there is data loss.
   But when we started , it was NOT NULL. It became null, because schema was 
empty.
   So to handle that case, a POJO without getters, should be represented as 
BinaryType , so that when the dataframe is reconverted,  field x gets 
deserialized pojo.
   The reason why it is not done for top class is that there are existing 
tests, which assert that if top level class has no getters, schema should be 
empty, implying 0 rows and no schema.
   Now whether that is desirable, or it should be represented as a binary type 
is debatable. As in any case no meaningful sql operation can be done on binary 
data .
   So a distinction is made using the boolean. That is Top level class with no 
getters need to be treated differently from any field having no getters.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]

2024-11-15 Thread via GitHub


ahshahid commented on code in PR #48252:
URL: https://github.com/apache/spark/pull/48252#discussion_r1844722744


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala:
##
@@ -148,34 +163,180 @@ object JavaTypeInference {
   // TODO: we should only collect properties that have getter and setter. 
However, some tests
   //   pass in scala case class as java bean class which doesn't have 
getter and setter.
   val properties = getJavaBeanReadableProperties(c)
-  // add type variables from inheritance hierarchy of the class
-  val classTV = JavaTypeUtils.getTypeArguments(c, 
classOf[Object]).asScala.toMap ++
-typeVariables
-  // Note that the fields are ordered by name.
-  val fields = properties.map { property =>
-val readMethod = property.getReadMethod
-val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet 
+ c, classTV)
-// The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
-val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
-EncoderField(
-  property.getName,
-  encoder,
-  encoder.nullable && !hasNonNull,
-  Metadata.empty,
-  Option(readMethod.getName),
-  Option(property.getWriteMethod).map(_.getName))
+
+  // if the properties is empty and this is not a top level enclosing 
class, then we
+  // should not consider class as bean, as otherwise it will be treated as 
empty schema
+  // and loose the data on deser.
+  if (properties.isEmpty && seenTypeSet.nonEmpty) {
+findBestEncoder(Seq(c), seenTypeSet, typeVariables, None, 
serializableEncodersOnly = true)

Review Comment:
   will revert on this.. by going through the code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]

2024-11-15 Thread via GitHub


ahshahid commented on code in PR #48252:
URL: https://github.com/apache/spark/pull/48252#discussion_r1844722897


##
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala:
##
@@ -2802,6 +2821,79 @@ class DatasetSuite extends QueryTest
   }
 }
   }
+
+  test("SPARK-49789 Bean class encoding with generic type implementing 
Serializable") {
+// just create encoder
+val enc = Encoders.bean(classOf[MessageWrapper[_]])
+val data = Seq("test1", "test2").map(str => {
+  val msg = new MessageWrapper[String]()
+  msg.setMessage(str)
+  msg
+})
+validateParamBeanDataset(classOf[MessageWrapper[String]],
+  data, mutable.Buffer(data: _*),
+  StructType(Seq(StructField("message", BinaryType, true)))
+)
+  }
+
+  test("SPARK-49789 Bean class encoding with  generic type indirectly 
extending" +
+" Serializable class") {
+// just create encoder
+Encoders.bean(classOf[BigDecimalMessageWrapper[_]])
+val data = Seq(2d, 8d).map(doub => {
+  val bean = new BigDecimalMessageWrapper[DerivedBigDecimalExtender]()
+  bean.setMessage(new DerivedBigDecimalExtender(doub))
+  bean
+})
+validateParamBeanDataset(
+  classOf[BigDecimalMessageWrapper[DerivedBigDecimalExtender]],
+  data, mutable.Buffer(data: _*),
+  StructType(Seq(StructField("message", BinaryType, true
+  }
+
+  test("SPARK-49789. test bean class with  generictype bound of UDTType") {
+// just create encoder
+UDTRegistration.register(classOf[TestUDT].getName, 
classOf[TestUDTType].getName)
+val enc = Encoders.bean(classOf[UDTBean[_]])
+val baseData = Seq((1, "a"), (2, "b"))
+val data = baseData.map(tup => {
+  val bean = new UDTBean[TestUDT]()
+  bean.setMessage(new TestUDTImplSub(tup._1, tup._2))
+  bean
+})
+val expectedData = baseData.map(tup => {
+  val bean = new UDTBean[TestUDT]()
+  bean.setMessage(new TestUDTImpl(tup._1, tup._2))
+  bean
+})
+validateParamBeanDataset(
+  classOf[UDTBean[TestUDT]],
+  data, mutable.Buffer(expectedData: _*),
+  StructType(Seq(StructField("message", new TestUDTType(), true
+  }
+
+  private def validateParamBeanDataset[T](
+   classToEncode: Class[T],
+   data: Seq[T],
+   expectedData: mutable.Buffer[T],
+   expectedSchema: StructType): Unit = 
{
+

Review Comment:
   thanks. will do that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR] Use LinkedHashSet for ResolveLateralColumnAliasReference to generate stable hash for the plan [spark]

2024-11-15 Thread via GitHub


github-actions[bot] commented on PR #47571:
URL: https://github.com/apache/spark/pull/47571#issuecomment-2480202800

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Enhance the metrics in SparkUI with logical plan stats [spark]

2024-11-15 Thread via GitHub


github-actions[bot] closed pull request #47534: Enhance the metrics in SparkUI 
with logical plan stats
URL: https://github.com/apache/spark/pull/47534


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Fixed comma splice in cluster-overview.md [spark]

2024-11-15 Thread via GitHub


github-actions[bot] commented on PR #47615:
URL: https://github.com/apache/spark/pull/47615#issuecomment-2480202795

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]

2024-11-15 Thread via GitHub


ahshahid commented on code in PR #48252:
URL: https://github.com/apache/spark/pull/48252#discussion_r1844724224


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala:
##
@@ -148,34 +163,180 @@ object JavaTypeInference {
   // TODO: we should only collect properties that have getter and setter. 
However, some tests
   //   pass in scala case class as java bean class which doesn't have 
getter and setter.
   val properties = getJavaBeanReadableProperties(c)
-  // add type variables from inheritance hierarchy of the class
-  val classTV = JavaTypeUtils.getTypeArguments(c, 
classOf[Object]).asScala.toMap ++
-typeVariables
-  // Note that the fields are ordered by name.
-  val fields = properties.map { property =>
-val readMethod = property.getReadMethod
-val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet 
+ c, classTV)
-// The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
-val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
-EncoderField(
-  property.getName,
-  encoder,
-  encoder.nullable && !hasNonNull,
-  Metadata.empty,
-  Option(readMethod.getName),
-  Option(property.getWriteMethod).map(_.getName))
+
+  // if the properties is empty and this is not a top level enclosing 
class, then we
+  // should not consider class as bean, as otherwise it will be treated as 
empty schema
+  // and loose the data on deser.
+  if (properties.isEmpty && seenTypeSet.nonEmpty) {
+findBestEncoder(Seq(c), seenTypeSet, typeVariables, None, 
serializableEncodersOnly = true)
+  .getOrElse(throw 
ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName))
+  } else {
+// add type variables from inheritance hierarchy of the class
+val parentClassesTypeMap =
+  JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap
+val classTV = parentClassesTypeMap ++ typeVariables
+// Note that the fields are ordered by name.
+val fields = properties.map { property =>
+  val readMethod = property.getReadMethod
+  val methodReturnType = readMethod.getGenericReturnType
+  val encoder = encoderFor(methodReturnType, seenTypeSet + c, classTV)
+  // The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
+  val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
+  EncoderField(
+property.getName,
+encoder,
+encoder.nullable && !hasNonNull,
+Metadata.empty,
+Option(readMethod.getName),
+Option(property.getWriteMethod).map(_.getName))
+}
+// implies it cannot be assumed a BeanClass.
+// Check if its super class or interface could be represented by an 
Encoder

Review Comment:
   let me go through. will get back.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48835] Introduce versoning to jdbc connectors [spark]

2024-11-15 Thread via GitHub


github-actions[bot] commented on PR #47181:
URL: https://github.com/apache/spark/pull/47181#issuecomment-2480202821

   We're closing this PR because it hasn't been updated in a while. This isn't 
a judgement on the merit of the PR in any way. It's just a way of keeping the 
PR queue manageable.
   If you'd like to revive this PR, please reopen it and ask a committer to 
remove the Stale tag!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-15 Thread via GitHub


bogao007 commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1844724391


##
python/pyspark/sql/streaming/stateful_processor.py:
##
@@ -420,10 +411,27 @@ def handleInputRows(
 timer_values: TimerValues
   Timer value for the current batch that process the input 
rows.
   Users can get the processing or event time timestamp 
from TimerValues.
+"""
+return iter([])

Review Comment:
   Why do we change the `...` placeholder here?



##
python/pyspark/sql/streaming/stateful_processor.py:
##
@@ -420,10 +411,27 @@ def handleInputRows(
 timer_values: TimerValues
   Timer value for the current batch that process the input 
rows.
   Users can get the processing or event time timestamp 
from TimerValues.
+"""
+return iter([])
+
+def handleExpiredTimer(

Review Comment:
   Just double check that this method is not required for users to implement, 
correct?



##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -573,7 +568,16 @@ def transformWithStateUDF(
 
statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.CLOSED)
 return iter([])
 
-result = handle_data_with_timers(statefulProcessorApiClient, key, 
inputRows)
+if timeMode != "none":
+batch_timestamp = 
statefulProcessorApiClient.get_batch_timestamp()
+watermark_timestamp = 
statefulProcessorApiClient.get_watermark_timestamp()
+else:
+batch_timestamp = -1
+watermark_timestamp = -1

Review Comment:
   Can we abstract this as a separate method and share in both UDFs to reduce 
redundant code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]

2024-11-15 Thread via GitHub


ahshahid commented on code in PR #48252:
URL: https://github.com/apache/spark/pull/48252#discussion_r1844723692


##
sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala:
##
@@ -2909,6 +3016,7 @@ object KryoData {
 
 /** Used to test Java encoder. */
 class JavaData(val a: Int) extends Serializable {
+  def this() = this(0)

Review Comment:
   Any class which is Serializable, for serializer to work correctly, needs to 
have a default empty argument constructor.
   Otherwise there will be runtime exception.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49676][SS][PYTHON] Add Support for Chaining of Operators in transformWithStateInPandas API [spark]

2024-11-15 Thread via GitHub


jingz-db commented on PR #48124:
URL: https://github.com/apache/spark/pull/48124#issuecomment-2480233401

   Hey @HeartSaVioR, could you take another look? Thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48769][SQL] Support constant folding for ScalaUDF [spark]

2024-11-15 Thread via GitHub


github-actions[bot] closed pull request #47164: [SPARK-48769][SQL] Support 
constant folding for ScalaUDF
URL: https://github.com/apache/spark/pull/47164


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]

2024-11-15 Thread via GitHub


ahshahid commented on code in PR #48252:
URL: https://github.com/apache/spark/pull/48252#discussion_r1844713507


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala:
##
@@ -148,34 +163,180 @@ object JavaTypeInference {
   // TODO: we should only collect properties that have getter and setter. 
However, some tests
   //   pass in scala case class as java bean class which doesn't have 
getter and setter.
   val properties = getJavaBeanReadableProperties(c)
-  // add type variables from inheritance hierarchy of the class
-  val classTV = JavaTypeUtils.getTypeArguments(c, 
classOf[Object]).asScala.toMap ++
-typeVariables
-  // Note that the fields are ordered by name.
-  val fields = properties.map { property =>
-val readMethod = property.getReadMethod
-val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet 
+ c, classTV)
-// The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
-val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
-EncoderField(
-  property.getName,
-  encoder,
-  encoder.nullable && !hasNonNull,
-  Metadata.empty,
-  Option(readMethod.getName),
-  Option(property.getWriteMethod).map(_.getName))
+
+  // if the properties is empty and this is not a top level enclosing 
class, then we
+  // should not consider class as bean, as otherwise it will be treated as 
empty schema
+  // and loose the data on deser.
+  if (properties.isEmpty && seenTypeSet.nonEmpty) {
+findBestEncoder(Seq(c), seenTypeSet, typeVariables, None, 
serializableEncodersOnly = true)
+  .getOrElse(throw 
ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName))
+  } else {
+// add type variables from inheritance hierarchy of the class
+val parentClassesTypeMap =
+  JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap
+val classTV = parentClassesTypeMap ++ typeVariables
+// Note that the fields are ordered by name.
+val fields = properties.map { property =>
+  val readMethod = property.getReadMethod
+  val methodReturnType = readMethod.getGenericReturnType
+  val encoder = encoderFor(methodReturnType, seenTypeSet + c, classTV)
+  // The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
+  val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
+  EncoderField(
+property.getName,
+encoder,
+encoder.nullable && !hasNonNull,
+Metadata.empty,
+Option(readMethod.getName),
+Option(property.getWriteMethod).map(_.getName))
+}
+// implies it cannot be assumed a BeanClass.
+// Check if its super class or interface could be represented by an 
Encoder
+
+JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq)
   }
-  JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq)
 
 case _ =>
   throw ExecutionErrors.cannotFindEncoderForTypeError(t.toString)
   }
 
+  private def createUDTEncoderUsingAnnotation(c: Class[_]): UDTEncoder[Any] = {
+val udt = c
+  .getAnnotation(classOf[SQLUserDefinedType])
+  .udt()
+  .getConstructor()
+  .newInstance()
+  .asInstanceOf[UserDefinedType[Any]]
+val udtClass = 
udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt()
+UDTEncoder(udt, udtClass)
+  }
+
+  private def createUDTEncoderUsingRegistration(c: Class[_]): UDTEncoder[Any] 
= {
+val udt = UDTRegistration
+  .getUDTFor(c.getName)
+  .get
+  .getConstructor()
+  .newInstance()
+  .asInstanceOf[UserDefinedType[Any]]
+UDTEncoder(udt, udt.getClass)
+  }
+
   def getJavaBeanReadableProperties(beanClass: Class[_]): 
Array[PropertyDescriptor] = {
 val beanInfo = Introspector.getBeanInfo(beanClass)
 beanInfo.getPropertyDescriptors
   .filterNot(_.getName == "class")
   .filterNot(_.getName == "declaringClass")
   .filter(_.getReadMethod != null)
   }
+
+  private def findBestEncoder(
+  typesToCheck: Seq[Class[_]],
+  seenTypeSet: Set[Class[_]],
+  typeVariables: Map[TypeVariable[_], Type],
+  baseClass: Option[Class[_]],
+  serializableEncodersOnly: Boolean = false): Option[AgnosticEncoder[_]] =
+if (serializableEncodersOnly) {
+  val isClientConnect = clientConnectFlag.get
+  assert(typesToCheck.size == 1)
+  typesToCheck
+.flatMap(c => {
+  if (!isClientConnect && 
classOf[KryoSerializable].isAssignableFrom(c)) {

Review Comment:
   will do. the expalnation is provided in the previous comment's response.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: rev

Re: [PR] [SPARK-50324][PYTHON][CONNECT] Make `createDataFrame` trigger `Config` RPC at most once [spark]

2024-11-15 Thread via GitHub


HyukjinKwon commented on code in PR #48856:
URL: https://github.com/apache/spark/pull/48856#discussion_r1844745641


##
python/pyspark/sql/connect/session.py:
##
@@ -706,9 +724,9 @@ def createDataFrame(
 else:
 local_relation = LocalRelation(_table)
 
-cache_threshold = 
self._client.get_configs("spark.sql.session.localRelationCacheThreshold")
+cache_threshold = 
conf_getter["spark.sql.session.localRelationCacheThreshold"]

Review Comment:
   Shall we just get all the confs in batch eagerly? Seems like we should get 
the conf once anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50324][PYTHON][CONNECT] Make `createDataFrame` trigger `Config` RPC at most once [spark]

2024-11-15 Thread via GitHub


zhengruifeng commented on code in PR #48856:
URL: https://github.com/apache/spark/pull/48856#discussion_r1844867144


##
python/pyspark/sql/connect/session.py:
##
@@ -706,9 +724,9 @@ def createDataFrame(
 else:
 local_relation = LocalRelation(_table)
 
-cache_threshold = 
self._client.get_configs("spark.sql.session.localRelationCacheThreshold")
+cache_threshold = 
conf_getter["spark.sql.session.localRelationCacheThreshold"]

Review Comment:
   there are 2 cases that we don't need the configs:
   1, the local data is empty, and the schema is specified, it returns a valid 
empty df;
   2, the creation fails due to some assertions



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy [spark]

2024-11-15 Thread via GitHub


ueshin commented on code in PR #48829:
URL: https://github.com/apache/spark/pull/48829#discussion_r1844320464


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -95,13 +95,12 @@ private[sql] object Dataset {
   def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =
 sparkSession.withActive {
   val qe = sparkSession.sessionState.executePlan(logicalPlan)
-  val encoder = if (qe.isLazyAnalysis) {
-RowEncoder.encoderFor(new StructType())
+  if (qe.isLazyAnalysis) {

Review Comment:
   The lazy encoder creation only happens when `qe.isLazyAnalysis`; otherwise 
create it right here as same as before to avoid changing the behavior. 
`Dataset` has a new default constructor to take a function for the lazy case, 
and the original default constructor is still there to keep the behavior.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]

2024-11-15 Thread via GitHub


brkyvz commented on code in PR #48401:
URL: https://github.com/apache/spark/pull/48401#discussion_r1844378796


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -343,7 +497,8 @@ class StatefulProcessorHandleImpl(
  * actually done. We need this class because we can only collect the schemas 
after
  * the StatefulProcessor is initialized.
  */
-class DriverStatefulProcessorHandleImpl(timeMode: TimeMode, keyExprEnc: 
ExpressionEncoder[Any])
+class DriverStatefulProcessorHandleImpl(
+timeMode: TimeMode, keyExprEnc: ExpressionEncoder[Any], initializeAvroEnc: 
Boolean)

Review Comment:
   nit: one line per parameter please



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -718,6 +733,22 @@ object TransformWithStateExec {
   stateStoreCkptIds = None
 )
 
+val stateStoreEncoding = child.session.sessionState.conf.getConf(
+  SQLConf.STREAMING_STATE_STORE_ENCODING_FORMAT
+)
+
+def getDriverProcessorHandle(): DriverStatefulProcessorHandleImpl = {
+  val driverProcessorHandle = new DriverStatefulProcessorHandleImpl(
+timeMode, keyEncoder, initializeAvroEnc =
+ avroEncodingEnabled(stateStoreEncoding))
+  
driverProcessorHandle.setHandleState(StatefulProcessorHandleState.PRE_INIT)
+  statefulProcessor.setHandle(driverProcessorHandle)
+  statefulProcessor.init(outputMode, timeMode)
+  driverProcessorHandle
+}
+
+val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas

Review Comment:
   nit: maybe if you add a `withColumnFamilySchema` method, you can remove the 
need for this duplication and can just call it below after creating the class



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -104,7 +106,9 @@ case class TransformWithStateExec(
* @return a new instance of the driver processor handle
*/
   private def getDriverProcessorHandle(): DriverStatefulProcessorHandleImpl = {

Review Comment:
   nit: should this method have an assertion that it is being called on the 
Driver?



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -596,7 +601,8 @@ case class TransformWithStateExec(
   private def initNewStateStoreAndProcessData(
   partitionId: Int,
   hadoopConfBroadcast: Broadcast[SerializableConfiguration])
-(f: StateStore => CompletionIterator[InternalRow, Iterator[InternalRow]]):
+(f: StateStore =>
+  CompletionIterator[InternalRow, Iterator[InternalRow]]):

Review Comment:
   also nit: should we use type aliasing to shorten this 
`CompletionIterator[InternalRow, Iterator[InternalRow]]`? Like `type 
ResultIterator = CompletionIterator[InternalRow, Iterator[InternalRow]]`



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StatefulProcessorHandleImpl.scala:
##
@@ -118,6 +122,14 @@ class StatefulProcessorHandleImpl(
 
   currState = CREATED
 
+  private def getAvroEnc(stateName: String): Option[AvroEncoder] = {
+if (!schemas.contains(stateName)) {
+  None
+} else {
+  schemas(stateName).avroEnc
+}

Review Comment:
   `schemas.get(stateName).map(_.avroEnc)`?



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -596,7 +601,8 @@ case class TransformWithStateExec(
   private def initNewStateStoreAndProcessData(
   partitionId: Int,
   hadoopConfBroadcast: Broadcast[SerializableConfiguration])
-(f: StateStore => CompletionIterator[InternalRow, Iterator[InternalRow]]):
+(f: StateStore =>
+  CompletionIterator[InternalRow, Iterator[InternalRow]]):

Review Comment:
   uber nit: change necessary?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49789][SQL] Handling of generic parameter with bounds while creating encoders [spark]

2024-11-15 Thread via GitHub


sririshindra commented on code in PR #48252:
URL: https://github.com/apache/spark/pull/48252#discussion_r1844241460


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/JavaTypeInference.scala:
##
@@ -148,34 +163,180 @@ object JavaTypeInference {
   // TODO: we should only collect properties that have getter and setter. 
However, some tests
   //   pass in scala case class as java bean class which doesn't have 
getter and setter.
   val properties = getJavaBeanReadableProperties(c)
-  // add type variables from inheritance hierarchy of the class
-  val classTV = JavaTypeUtils.getTypeArguments(c, 
classOf[Object]).asScala.toMap ++
-typeVariables
-  // Note that the fields are ordered by name.
-  val fields = properties.map { property =>
-val readMethod = property.getReadMethod
-val encoder = encoderFor(readMethod.getGenericReturnType, seenTypeSet 
+ c, classTV)
-// The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
-val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
-EncoderField(
-  property.getName,
-  encoder,
-  encoder.nullable && !hasNonNull,
-  Metadata.empty,
-  Option(readMethod.getName),
-  Option(property.getWriteMethod).map(_.getName))
+
+  // if the properties is empty and this is not a top level enclosing 
class, then we
+  // should not consider class as bean, as otherwise it will be treated as 
empty schema
+  // and loose the data on deser.
+  if (properties.isEmpty && seenTypeSet.nonEmpty) {
+findBestEncoder(Seq(c), seenTypeSet, typeVariables, None, 
serializableEncodersOnly = true)
+  .getOrElse(throw 
ExecutionErrors.cannotFindEncoderForTypeError(t.getTypeName))
+  } else {
+// add type variables from inheritance hierarchy of the class
+val parentClassesTypeMap =
+  JavaTypeUtils.getTypeArguments(c, classOf[Object]).asScala.toMap
+val classTV = parentClassesTypeMap ++ typeVariables
+// Note that the fields are ordered by name.
+val fields = properties.map { property =>
+  val readMethod = property.getReadMethod
+  val methodReturnType = readMethod.getGenericReturnType
+  val encoder = encoderFor(methodReturnType, seenTypeSet + c, classTV)
+  // The existence of `javax.annotation.Nonnull`, means this field is 
not nullable.
+  val hasNonNull = readMethod.isAnnotationPresent(classOf[Nonnull])
+  EncoderField(
+property.getName,
+encoder,
+encoder.nullable && !hasNonNull,
+Metadata.empty,
+Option(readMethod.getName),
+Option(property.getWriteMethod).map(_.getName))
+}
+// implies it cannot be assumed a BeanClass.
+// Check if its super class or interface could be represented by an 
Encoder
+
+JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq)
   }
-  JavaBeanEncoder(ClassTag(c), fields.toImmutableArraySeq)
 
 case _ =>
   throw ExecutionErrors.cannotFindEncoderForTypeError(t.toString)
   }
 
+  private def createUDTEncoderUsingAnnotation(c: Class[_]): UDTEncoder[Any] = {
+val udt = c
+  .getAnnotation(classOf[SQLUserDefinedType])
+  .udt()
+  .getConstructor()
+  .newInstance()
+  .asInstanceOf[UserDefinedType[Any]]
+val udtClass = 
udt.userClass.getAnnotation(classOf[SQLUserDefinedType]).udt()
+UDTEncoder(udt, udtClass)
+  }
+
+  private def createUDTEncoderUsingRegistration(c: Class[_]): UDTEncoder[Any] 
= {
+val udt = UDTRegistration
+  .getUDTFor(c.getName)
+  .get
+  .getConstructor()
+  .newInstance()
+  .asInstanceOf[UserDefinedType[Any]]
+UDTEncoder(udt, udt.getClass)
+  }
+
   def getJavaBeanReadableProperties(beanClass: Class[_]): 
Array[PropertyDescriptor] = {
 val beanInfo = Introspector.getBeanInfo(beanClass)
 beanInfo.getPropertyDescriptors
   .filterNot(_.getName == "class")
   .filterNot(_.getName == "declaringClass")
   .filter(_.getReadMethod != null)
   }
+
+  private def findBestEncoder(
+  typesToCheck: Seq[Class[_]],
+  seenTypeSet: Set[Class[_]],
+  typeVariables: Map[TypeVariable[_], Type],
+  baseClass: Option[Class[_]],
+  serializableEncodersOnly: Boolean = false): Option[AgnosticEncoder[_]] =
+if (serializableEncodersOnly) {
+  val isClientConnect = clientConnectFlag.get
+  assert(typesToCheck.size == 1)
+  typesToCheck
+.flatMap(c => {
+  if (!isClientConnect && 
classOf[KryoSerializable].isAssignableFrom(c)) {

Review Comment:
   Can we add a comment explaining why isClientConnect being true disqualifies 
the type to be encodes using kryo Serializer. Is there a chance that will 
change in the future? If so, can we add a TODO statement here so that we can 
remove this condition if and when Kr

Re: [PR] [WIP] Subquery only Spark Connect. [spark]

2024-11-15 Thread via GitHub


ueshin closed pull request #48864: [WIP] Subquery only Spark Connect.
URL: https://github.com/apache/spark/pull/48864


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SQL][SPARK-50329] fix InSet$toString [spark]

2024-11-15 Thread via GitHub


averyqi-db opened a new pull request, #48865:
URL: https://github.com/apache/spark/pull/48865

   
   
   ### What changes were proposed in this pull request?
   
   Fix InSet$toString for unresolved plan node
   
   
   ### Why are the changes needed?
   
   InSet$toString should always work even for unresolved node
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   
   ### How was this patch tested?
   
   end to end test by running TPCDS benchmark suite with planChangeLog enabled.
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50329][SQL] fix InSet$toString [spark]

2024-11-15 Thread via GitHub


HyukjinKwon commented on code in PR #48865:
URL: https://github.com/apache/spark/pull/48865#discussion_r1844738816


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/predicates.scala:
##
@@ -609,6 +609,9 @@ case class InSet(child: Expression, hset: Set[Any]) extends 
UnaryExpression with
   require(hset != null, "hset could not be null")
 
   override def toString: String = {
+if (!child.resolved) {
+  return s"$child INSET (values with unresolved data types)"
+}

Review Comment:
   can we have a simple test tho?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50309][DOCS] Document `SQL Pipe` Syntax [spark]

2024-11-15 Thread via GitHub


gengliangwang closed pull request #48852: [SPARK-50309][DOCS] Document `SQL 
Pipe` Syntax
URL: https://github.com/apache/spark/pull/48852


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] Subquery only Spark Connect. [spark]

2024-11-15 Thread via GitHub


ueshin opened a new pull request, #48863:
URL: https://github.com/apache/spark/pull/48863

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49787][SQL] Cast between UDT and other types [spark]

2024-11-15 Thread via GitHub


HyukjinKwon commented on PR #48251:
URL: https://github.com/apache/spark/pull/48251#issuecomment-2480360796

   Actually  I wanted to make a fix like this a long time ago, and gave up 
after reading ANSI spec because UDT cannot be casted to any type according to 
it IIRC.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843587760


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##
@@ -2219,21 +2219,24 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
 numArgs: Int,
 u: UnresolvedFunction): Expression = {
   func match {
-case owg: SupportsOrderingWithinGroup if u.isDistinct =>
-  throw 
QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError(
-owg.prettyName)
+case owg: SupportsOrderingWithinGroup if !owg.isDistinctSupported && 
u.isDistinct =>
+throw 
QueryCompilationErrors.distinctWithOrderingFunctionUnsupportedError(
+  owg.prettyName)
 case owg: SupportsOrderingWithinGroup
-  if !owg.orderingFilled && u.orderingWithinGroup.isEmpty =>
-  throw 
QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError(
-owg.prettyName)
-case owg: SupportsOrderingWithinGroup
-  if owg.orderingFilled && u.orderingWithinGroup.nonEmpty =>
-  throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+  if owg.isOrderingMandatory && !owg.orderingFilled && 
u.orderingWithinGroup.isEmpty =>
+throw 
QueryCompilationErrors.functionMissingWithinGroupError(owg.prettyName)
+case owg: Mode if owg.orderingFilled && u.orderingWithinGroup.nonEmpty 
=>

Review Comment:
   why we only check `Mode` now? Isn't it a general check?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843590480


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala:
##
@@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.catalyst.expressions.SortOrder
 
 /**
- * The trait used to set the [[SortOrder]] after inverse distribution 
functions parsed.
+ * The trait used to set the [[SortOrder]] for supporting functions.
  */
-trait SupportsOrderingWithinGroup { self: AggregateFunction =>

Review Comment:
   why remove `self: AggregateFunction =>`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843591341


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala:
##
@@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.catalyst.expressions.SortOrder
 
 /**
- * The trait used to set the [[SortOrder]] after inverse distribution 
functions parsed.
+ * The trait used to set the [[SortOrder]] for supporting functions.
  */
-trait SupportsOrderingWithinGroup { self: AggregateFunction =>
-  def orderingFilled: Boolean = false
+trait SupportsOrderingWithinGroup {
   def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction
+
+  /** Indicator that ordering was set. */
+  def orderingFilled: Boolean
+
+  /**
+   * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function.
+   *
+   * @see [[QueryCompilationErrors.functionMissingWithinGroupError]]
+   * @see [[org.apache.spark.sql.catalyst.analysis.Analyzer]]

Review Comment:
   this isn't very useful as Analyzer has thousands of lines of code...



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala:
##
@@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.catalyst.expressions.SortOrder
 
 /**
- * The trait used to set the [[SortOrder]] after inverse distribution 
functions parsed.
+ * The trait used to set the [[SortOrder]] for supporting functions.
  */
-trait SupportsOrderingWithinGroup { self: AggregateFunction =>
-  def orderingFilled: Boolean = false
+trait SupportsOrderingWithinGroup {
   def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction
+
+  /** Indicator that ordering was set. */
+  def orderingFilled: Boolean
+
+  /**
+   * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function.
+   *
+   * @see [[QueryCompilationErrors.functionMissingWithinGroupError]]
+   * @see [[org.apache.spark.sql.catalyst.analysis.Analyzer]]
+   */
+  def isOrderingMandatory: Boolean
+
+  /**
+   * Tells Analyzer that DISTINCT is supported.
+   * The DISTINCT can conflict with order so some functions can ban it.
+   *
+   * @see [[QueryCompilationErrors.functionMissingWithinGroupError]]
+   * @see [[org.apache.spark.sql.catalyst.analysis.Analyzer]]

Review Comment:
   ditto



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50327][SQL] Factor out function resolution to be reused in the single-pass Analyzer [spark]

2024-11-15 Thread via GitHub


vladimirg-db commented on code in PR #48858:
URL: https://github.com/apache/spark/pull/48858#discussion_r1843505454


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionResolution.scala:
##
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.analysis
+
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.spark.sql.AnalysisException
+import org.apache.spark.sql.catalyst.FunctionIdentifier
+import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.catalyst.expressions.aggregate._
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.connector.catalog.{
+  CatalogManager,
+  CatalogV2Util,
+  FunctionCatalog,
+  Identifier,
+  LookupCatalog
+}
+import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
+import org.apache.spark.sql.connector.catalog.functions.{
+  AggregateFunction => V2AggregateFunction,
+  ScalarFunction
+}
+import org.apache.spark.sql.errors.{DataTypeErrorsBase, QueryCompilationErrors}
+import org.apache.spark.sql.types._
+
+class FunctionResolution(
+override val catalogManager: CatalogManager,
+relationResolution: RelationResolution)
+extends DataTypeErrorsBase with LookupCatalog {
+  private val v1SessionCatalog = catalogManager.v1SessionCatalog
+
+  private val trimWarningEnabled = new AtomicBoolean(true)

Review Comment:
   Since this atomic boolean was in `object FunctionResolution` _inside_ `class 
Analyzer`, we have to move it to `class FunctionResolution`, and not to global 
`object FunctionResolution`. @MaxGekk please correct me if I'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR] Fix code style for if/for/while statements [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on PR #48425:
URL: https://github.com/apache/spark/pull/48425#issuecomment-2478527669

   +1, LGTM. Merging to master.
   Thank you, @exmy.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


mikhailnik-db commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843628058


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala:
##
@@ -2219,21 +2219,24 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
 numArgs: Int,
 u: UnresolvedFunction): Expression = {
   func match {
-case owg: SupportsOrderingWithinGroup if u.isDistinct =>
-  throw 
QueryCompilationErrors.distinctInverseDistributionFunctionUnsupportedError(
-owg.prettyName)
+case owg: SupportsOrderingWithinGroup if !owg.isDistinctSupported && 
u.isDistinct =>
+throw 
QueryCompilationErrors.distinctWithOrderingFunctionUnsupportedError(
+  owg.prettyName)
 case owg: SupportsOrderingWithinGroup
-  if !owg.orderingFilled && u.orderingWithinGroup.isEmpty =>
-  throw 
QueryCompilationErrors.inverseDistributionFunctionMissingWithinGroupError(
-owg.prettyName)
-case owg: SupportsOrderingWithinGroup
-  if owg.orderingFilled && u.orderingWithinGroup.nonEmpty =>
-  throw 
QueryCompilationErrors.wrongNumOrderingsForInverseDistributionFunctionError(
+  if owg.isOrderingMandatory && !owg.orderingFilled && 
u.orderingWithinGroup.isEmpty =>
+throw 
QueryCompilationErrors.functionMissingWithinGroupError(owg.prettyName)
+case owg: Mode if owg.orderingFilled && u.orderingWithinGroup.nonEmpty 
=>

Review Comment:
   I described above this case:
   > Plus this is a strange case, because the actual check of order expressions 
number happens in withOrderingWithinGroup(...) implementation. This case can 
match only Mode function if mode(expr) within group (order by expr) syntax is 
used. And that's because mode(expr) is equivalent to mode() within group (order 
by expr), and here we actually just ban using two syntaxes simultaneously. I 
think the code and error message is quite misleading.
   
   if `owg.orderingFilled` is true before  
`withOrderingWithinGroup(u.orderingWithinGroup)` was called means that the 
implementation makes some hacks to fill it. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50295][INFRA] Add a script to build docs with image [spark]

2024-11-15 Thread via GitHub


pan3793 commented on code in PR #48860:
URL: https://github.com/apache/spark/pull/48860#discussion_r1843718476


##
dev/spark-test-image/docs/build-docs-on-local:
##
@@ -0,0 +1,68 @@
+#!/usr/bin/env bash
+
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+if ! [ -x "$(command -v docker)" ]; then
+  echo "Error: Docker is not installed." >&2
+  exit 1
+fi
+
+DOCKER_CACHE_IMG="ghcr.io/apache/spark/apache-spark-github-action-image-docs-cache:master"
+REPO_OWNER="apache/spark"
+IMG_TAG=$(date +%s)
+IMG_NAME="apache-spark-ci-image-docs:${IMG_TAG}"
+IMG_URL="$REPO_OWNER/$IMG_NAME"
+DOCKER_MOUNT_SPARK_HOME="/__w/spark/spark"
+BUILD_DOCS_SCRIPT_PATH="${DOCKER_MOUNT_SPARK_HOME}/dev/spark-test-image/docs/build-docs-in-container"
+
+FWDIR="$(cd "`dirname "${BASH_SOURCE[0]}"`"; pwd)"
+SPARK_HOME="$(cd "`dirname "${BASH_SOURCE[0]}"`"/../../..; pwd)"
+
+# 1.Compile Spark outside the container to prepare for the next step of 
generating documents using the container
+build/sbt -Phive -Pkinesis-asl clean unidoc package
+
+# 2.build container image
+docker buildx build \
+  --cache-from type=registry,ref="${DOCKER_CACHE_IMG}" \
+  --tag "${IMG_URL}" "${FWDIR}"
+
+# 3.build docs on container: `error docs`, `scala doc`, `python doc`, `sql doc`
+docker run \
+  --mount type=bind,source="${SPARK_HOME}",target="${DOCKER_MOUNT_SPARK_HOME}" 
\

Review Comment:
   if the container is going to write files to the mounted path, please make 
sure permission won't bother the user accessing/deleting from host. for 
example, if the container writes files with uid 0, the host user may have no 
permission to delete them.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48847:
URL: https://github.com/apache/spark/pull/48847#discussion_r1843582476


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/parameters.scala:
##
@@ -189,7 +189,8 @@ object BindParameters extends ParameterizedQueryProcessor 
with QueryErrorsBase {
   // We should wait for `CTESubstitution` to resolve CTE before binding 
parameters, as CTE
   // relations are not children of `UnresolvedWith`.

Review Comment:
   I'm thinking of a different approach: shall we explicitly match 
`UnresolvedWith` here and bind parameters in the CTE relations? Then we no 
longer need this hack to delay parameter binding.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


mikhailnik-db commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843674767


##
sql/core/src/test/resources/sql-tests/results/listagg.sql.out:
##
@@ -0,0 +1,436 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct
+-- !query output
+123true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",

Review Comment:
   Should be `DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE`. Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50325][SQL] Factor out alias resolution to be reused in the single-pass Analyzer [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on PR #48857:
URL: https://github.com/apache/spark/pull/48857#issuecomment-2478677345

   +1, LGTM. Merging to master.
   Thank you, @vladimirg-db.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50325][SQL] Factor out alias resolution to be reused in the single-pass Analyzer [spark]

2024-11-15 Thread via GitHub


MaxGekk closed pull request #48857: [SPARK-50325][SQL] Factor out alias 
resolution to be reused in the single-pass Analyzer
URL: https://github.com/apache/spark/pull/48857


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


mikhailnik-db commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843673496


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala:
##
@@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.catalyst.expressions.SortOrder
 
 /**
- * The trait used to set the [[SortOrder]] after inverse distribution 
functions parsed.
+ * The trait used to set the [[SortOrder]] for supporting functions.
  */
-trait SupportsOrderingWithinGroup { self: AggregateFunction =>
-  def orderingFilled: Boolean = false
+trait SupportsOrderingWithinGroup {
   def withOrderingWithinGroup(orderingWithinGroup: Seq[SortOrder]): 
AggregateFunction
+
+  /** Indicator that ordering was set. */
+  def orderingFilled: Boolean
+
+  /**
+   * Tells Analyzer that WITHIN GROUP (ORDER BY ...) is mandatory for function.
+   *
+   * @see [[QueryCompilationErrors.functionMissingWithinGroupError]]
+   * @see [[org.apache.spark.sql.catalyst.analysis.Analyzer]]

Review Comment:
   In my head the idea was to use it with the error function together to find 
the exact line, but then the error function is enough)
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


MaxGekk closed pull request #48847: [SPARK-50322][SQL] Fix parameterized 
identifier in a sub-query
URL: https://github.com/apache/spark/pull/48847


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on PR #48847:
URL: https://github.com/apache/spark/pull/48847#issuecomment-2478680192

   Merging to master. Thank you, @srielau and @cloud-fan for review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50049][SQL] Support custom driver metrics in writing to v2 table [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on PR #48573:
URL: https://github.com/apache/spark/pull/48573#issuecomment-2478369932

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on code in PR #48847:
URL: https://github.com/apache/spark/pull/48847#discussion_r1843530688


##
sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala:
##
@@ -741,4 +741,21 @@ class ParametersSuite extends QueryTest with 
SharedSparkSession with PlanTest {
 Row("c1"))
 }
   }
+
+  test("SPARK-50322: parameterized identifier in a sub-query") {
+withTable("tt1") {
+  sql("create table tt1(c1 int)")
+  sql("insert into tt1 values (1)")
+  def query(p: String): String = {
+s"""
+  |with v1 as (
+  |  select * from tt1
+  |  where 1 = (Select * from identifier($p))

Review Comment:
   I uppercased all SQL keywords in the test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


mikhailnik-db commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843656733


##
sql/core/src/test/resources/sql-tests/results/listagg.sql.out:
##
@@ -0,0 +1,436 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct
+-- !query output
+123true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"error" : "','",
+"hint" : ""
+  }
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"dataType" : "(\"BINARY\" or \"STRING\")",
+"functionName" : "`listagg`",
+"sqlExpr" : "\"listagg(c1, , )\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 24,
+"fragment" : "listagg(c1, ', ')"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"inputExpr" : "\"a\"",
+"inputName" : "`delimiter`",
+"inputType" : "\"STRING\"",
+"sqlExpr" : "\"listagg(b, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 20,
+"fragment" : "listagg(b, a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+aa
+aa
+aabb
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"aggFunc" : "\"listagg(a, NULL, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIn

Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


mikhailnik-db commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843655281


##
sql/core/src/test/resources/sql-tests/results/listagg.sql.out:
##
@@ -0,0 +1,436 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct
+-- !query output
+123true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"error" : "','",
+"hint" : ""
+  }
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"dataType" : "(\"BINARY\" or \"STRING\")",
+"functionName" : "`listagg`",
+"sqlExpr" : "\"listagg(c1, , )\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 24,
+"fragment" : "listagg(c1, ', ')"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"inputExpr" : "\"a\"",
+"inputName" : "`delimiter`",
+"inputType" : "\"STRING\"",
+"sqlExpr" : "\"listagg(b, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 20,
+"fragment" : "listagg(b, a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+aa
+aa
+aabb
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"aggFunc" : "\"listagg(a, NULL, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIn

Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


mikhailnik-db commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843664606


##
sql/core/src/test/resources/sql-tests/results/listagg.sql.out:
##
@@ -0,0 +1,436 @@
+-- Automatically generated by SQLQueryTestSuite
+-- !query
+CREATE TEMP VIEW df AS
+SELECT * FROM (VALUES ('a', 'b'), ('a', 'c'), ('b', 'c'), ('b', 'd'), (NULL, 
NULL)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+CREATE TEMP VIEW df2 AS
+SELECT * FROM (VALUES (1, true), (2, false), (3, false)) AS t(a, b)
+-- !query schema
+struct<>
+-- !query output
+
+
+
+-- !query
+SELECT listagg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT string_agg(b) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b, NULL) FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+bc
+cd
+
+
+-- !query
+SELECT listagg(b) FROM df WHERE 1 != 1
+-- !query schema
+struct
+-- !query output
+NULL
+
+
+-- !query
+SELECT listagg(b, '|') FROM df GROUP BY a
+-- !query schema
+struct
+-- !query output
+NULL
+b|c
+c|d
+
+
+-- !query
+SELECT listagg(a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(DISTINCT a) FROM df
+-- !query schema
+struct
+-- !query output
+ab
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a DESC) OVER (PARTITION BY b) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+a
+b
+ba
+ba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b) FROM df
+-- !query schema
+struct
+-- !query output
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a, '|') WITHIN GROUP (ORDER BY b DESC) FROM df
+-- !query schema
+struct
+-- !query output
+b|a|b|a
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a ASC) FROM df
+-- !query schema
+struct
+-- !query output
+baba
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY b DESC, a DESC) FROM df
+-- !query schema
+struct
+-- !query output
+bbaa
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, NULL) FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭ��
+
+
+-- !query
+SELECT listagg(c1, X'42') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct
+-- !query output
+ޭB��
+
+
+-- !query
+SELECT listagg(a), listagg(b, ',') FROM df2
+-- !query schema
+struct
+-- !query output
+123true,false,false
+
+
+-- !query
+SELECT listagg(c1) FROM (VALUES (ARRAY['a', 'b'])) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.parser.ParseException
+{
+  "errorClass" : "PARSE_SYNTAX_ERROR",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"error" : "','",
+"hint" : ""
+  }
+}
+
+
+-- !query
+SELECT listagg(c1, ', ') FROM (VALUES (X'DEAD'), (X'BEEF')) AS t(c1)
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.DATA_DIFF_TYPES",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"dataType" : "(\"BINARY\" or \"STRING\")",
+"functionName" : "`listagg`",
+"sqlExpr" : "\"listagg(c1, , )\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 24,
+"fragment" : "listagg(c1, ', ')"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(b, a) FROM df GROUP BY a
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "DATATYPE_MISMATCH.NON_FOLDABLE_INPUT",
+  "sqlState" : "42K09",
+  "messageParameters" : {
+"inputExpr" : "\"a\"",
+"inputName" : "`delimiter`",
+"inputType" : "\"STRING\"",
+"sqlExpr" : "\"listagg(b, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIndex" : 20,
+"fragment" : "listagg(b, a)"
+  } ]
+}
+
+
+-- !query
+SELECT listagg(a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct
+-- !query output
+NULL
+aa
+aa
+aabb
+aabb
+
+
+-- !query
+SELECT listagg(a) WITHIN GROUP (ORDER BY a) OVER (ORDER BY a) FROM df
+-- !query schema
+struct<>
+-- !query output
+org.apache.spark.sql.catalyst.ExtendedAnalysisException
+{
+  "errorClass" : "INVALID_WINDOW_SPEC_FOR_AGGREGATION_FUNC",
+  "sqlState" : "42601",
+  "messageParameters" : {
+"aggFunc" : "\"listagg(a, NULL, a)\""
+  },
+  "queryContext" : [ {
+"objectType" : "",
+"objectName" : "",
+"startIndex" : 8,
+"stopIn

Re: [PR] [SPARK-42746][SQL] Implement LISTAGG function [spark]

2024-11-15 Thread via GitHub


mikhailnik-db commented on code in PR #48748:
URL: https://github.com/apache/spark/pull/48748#discussion_r1843670210


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/SupportsOrderingWithinGroup.scala:
##
@@ -20,9 +20,28 @@ package org.apache.spark.sql.catalyst.expressions.aggregate
 import org.apache.spark.sql.catalyst.expressions.SortOrder
 
 /**
- * The trait used to set the [[SortOrder]] after inverse distribution 
functions parsed.
+ * The trait used to set the [[SortOrder]] for supporting functions.
  */
-trait SupportsOrderingWithinGroup { self: AggregateFunction =>

Review Comment:
   Accidentally. Returned it back



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50081][SQL] Codegen Support for `XPath*`(by Invoke & RuntimeReplaceable) [spark]

2024-11-15 Thread via GitHub


panbingkun commented on PR #48610:
URL: https://github.com/apache/spark/pull/48610#issuecomment-2479038190

   > @panbingkun Could you resolve conflicts, please.
   
   Updated, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48847:
URL: https://github.com/apache/spark/pull/48847#discussion_r1843518188


##
sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala:
##
@@ -741,4 +741,21 @@ class ParametersSuite extends QueryTest with 
SharedSparkSession with PlanTest {
 Row("c1"))
 }
   }
+
+  test("SPARK-50322: parameterized identifier in a sub-query") {
+withTable("tt1") {
+  sql("create table tt1(c1 int)")
+  sql("insert into tt1 values (1)")
+  def query(p: String): String = {
+s"""
+  |with v1 as (
+  |  select * from tt1
+  |  where 1 = (Select * from identifier($p))

Review Comment:
   or we should upper case all SQL keywords



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48195][CORE] Save and reuse RDD/Broadcast created by SparkPlan [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on PR #48037:
URL: https://github.com/apache/spark/pull/48037#issuecomment-2478376297

   hmmm there are 4 suppressed exceptions for this simple query?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50309][SQL] Add documentation for SQL pipe syntax [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on code in PR #48852:
URL: https://github.com/apache/spark/pull/48852#discussion_r1843508769


##
docs/sql-pipe-syntax.md:
##
@@ -0,0 +1,540 @@
+---
+layout: global
+title: SQL Pipe Syntax
+displayTitle: SQL Pipe Syntax
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+### Syntax
+
+ Overview
+
+Apache Spark supports SQL pipe syntax which allows composing queries from 
combinations of operators.
+
+* Any query can have zero or more pipe operators as a suffix, delineated by 
the pipe character `|>`. 
+* Each pipe operator starts with one or more SQL keywords followed by its own 
grammar as described
+  in the table below.
+* Most of these operators reuse existing grammar for standard SQL clauses.
+* Operators can apply in any order, any number of times.
+
+`FROM ` is now a supported standalone query which behaves the same 
as
+`TABLE `. This provides a convenient starting place to begin a 
chained pipe SQL query,
+although it is possible to add one or more pipe operators to the end of any 
valid Spark SQL query
+with the same consistent behavior as written here.
+
+Please refer to the table at the end of this document for a full list of all 
supported operators
+and their semantics.
+
+ Example
+
+For example, this is query 13 from the TPC-H benchmark:
+
+```sql
+SELECT c_count, COUNT(*) AS custdist
+FROM
+  (SELECT c_custkey, COUNT(o_orderkey) c_count
+   FROM customer
+   LEFT OUTER JOIN orders ON c_custkey = o_custkey
+ AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey
+  ) AS c_orders
+GROUP BY c_count
+ORDER BY custdist DESC, c_count DESC;
+```
+
+To write the same logic using SQL pipe operators, we express it like this:
+
+```sql
+FROM customer
+|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
+   AND o_comment NOT LIKE '%unusual%packages%'
+|> AGGREGATE COUNT(o_orderkey) c_count
+   GROUP BY c_custkey
+|> AGGREGATE COUNT(*) AS custdist
+   GROUP BY c_count
+|> ORDER BY custdist DESC, c_count DESC;
+```
+
+ Source Tables
+
+To start a new query using SQL pipe syntax, use the `FROM ` or 
`TABLE `
+clause, which creates a relation comprising all rows from the source table. 
Then append one or more
+pipe operators to the end of this clause to perform further transformations.
+
+ Projections
+
+SQL pipe syntax supports composable ways to evaluate expressions. A major 
advantage of these
+projection features is that they support computing new expressions based on 
previous ones in an
+incremental way. No lateral column references are needed here since each 
operator applies
+independently on its input table, regardless of the order in which the 
operators appear. Each of
+these computed columns then becomes visible to use with the following operator.
+
+`SELECT` produces a new table by evaluating the provided expressions.
+It is possible to use `DISTINCT` and `*` as needed.
+This works like the outermost `SELECT` in a table subquery in regular Spark 
SQL.
+
+`EXTEND` adds new columns to the input table by evaluating the provided 
expressions.
+This also preserves table aliases.
+This works like `SELECT *, new_column` in regular Spark SQL.
+
+`DROP` removes columns from the input table.
+This is similar to `SELECT * EXCEPT (column)` in regular Spark SQL.
+
+`SET` replaces column values from the input table.
+This is similar to `SELECT * REPLACE (expression AS column)` in regular Spark 
SQL.
+
+`AS` forwards the input table and introduces a new alias for each row.
+
+ Aggregations
+
+In general, aggregation takes place differently using SQL pipe syntax as 
opposed to regular Spark
+SQL.
+
+To perform full-table aggregation, use the `AGGREGATE` operator with a list of 
aggregate
+expressions to evaluate. This returns one single row in the output table.
+
+To perform aggregation with grouping, use the `AGGREGATE` operator with a 
`GROUP BY` clause.
+This returns one row for each unique combination of values of the grouping 
expressions. The output
+table contains the evaluated grouping expressions followed by the evaluated 
aggregate functions.
+Grouping expressions support assigning aliases for purposes of referring to 
them in future
+operators. In this way, it is not necessary to re

[PR] [SPARK-50295][INFRA] Add a script to build docs with image [spark]

2024-11-15 Thread via GitHub


panbingkun opened a new pull request, #48860:
URL: https://github.com/apache/spark/pull/48860

   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50226][SQL] Correct MakeDTInterval and MakeYMInterval to catch Java exceptions [spark]

2024-11-15 Thread via GitHub


gotocoding-DB commented on code in PR #48773:
URL: https://github.com/apache/spark/pull/48773#discussion_r1842229690


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/intervalExpressions.scala:
##
@@ -568,17 +572,28 @@ case class MakeYMInterval(years: Expression, months: 
Expression)
   override def dataType: DataType = YearMonthIntervalType()
 
   override def nullSafeEval(year: Any, month: Any): Any = {
-Math.toIntExact(Math.addExact(month.asInstanceOf[Number].longValue(),
-  Math.multiplyExact(year.asInstanceOf[Number].longValue(), 
MONTHS_PER_YEAR)))
+try {
+  Math.toIntExact(
+Math.addExact(month.asInstanceOf[Int],
+  Math.multiplyExact(year.asInstanceOf[Int], MONTHS_PER_YEAR)))
+} catch {

Review Comment:
   Ok, I'll do it a bit later.
   UPD: https://github.com/apache/spark/pull/48848



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50049][SQL] Support custom driver metrics in writing to v2 table [spark]

2024-11-15 Thread via GitHub


cloud-fan closed pull request #48573: [SPARK-50049][SQL] Support custom driver 
metrics in writing to v2 table
URL: https://github.com/apache/spark/pull/48573


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50237][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_2138-9`: `CIRCULAR_CLASS_REFERENCE` [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on PR #48769:
URL: https://github.com/apache/spark/pull/48769#issuecomment-2478475873

   +1, LGTM. Merging to master.
   Thank you, @itholic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50237][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_2138-9`: `CIRCULAR_CLASS_REFERENCE` [spark]

2024-11-15 Thread via GitHub


MaxGekk closed pull request #48769: [SPARK-50237][SQL] Assign appropriate error 
condition for `_LEGACY_ERROR_TEMP_2138-9`: `CIRCULAR_CLASS_REFERENCE`
URL: https://github.com/apache/spark/pull/48769


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50091][SQL] Handle case of aggregates in left-hand operand of IN-subquery [spark]

2024-11-15 Thread via GitHub


bersprockets commented on PR #48627:
URL: https://github.com/apache/spark/pull/48627#issuecomment-2479232015

   cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50322][SQL] Fix parameterized identifier in a sub-query [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on PR #48847:
URL: https://github.com/apache/spark/pull/48847#issuecomment-2479259749

   We don't need to backport this fix to `branch-3.5` because it doesn't have 
the changes https://github.com/apache/spark/pull/47180.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]

2024-11-15 Thread via GitHub


mrk-andreev commented on code in PR #48501:
URL: https://github.com/apache/spark/pull/48501#discussion_r1844407932


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,88 @@
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
--
-UTF8_BINARY  1349   1349   
0  0.1   13485.4   1.0X
-UTF8_LCASE   3559   3561   
3  0.0   35594.3   2.6X
-UNICODE 17580  17589   
   12  0.0  175803.6  13.0X
-UNICODE_CI  17210  17212   
2  0.0  172100.2  12.8X
+UTF8_BINARY  2220   2223   
5  0.0   22197.0   1.0X
+UTF8_LCASE   4949   4950   
2  0.0   49488.1   2.2X
+UNICODE 28172  28198   
   36  0.0  281721.0  12.7X
+UNICODE_CI  28233  28308   
  106  0.0  282328.2  12.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
---
-UTF8_BINARY   1740   1741  
 1  0.1   17398.8   1.0X
-UTF8_LCASE2630   2632  
 3  0.0   26301.0   1.5X
-UNICODE  16732  16743  
16  0.0  167319.7   9.6X
-UNICODE_CI   16482  16492  
14  0.0  164819.7   9.5X
+UTF8_BINARY   2731   2733  
 2  0.0   27313.6   1.0X
+UTF8_LCASE4611   4619  
11  0.0   46111.4   1.7X
+UNICODE  28149  28211  
88  0.0  281486.8  10.3X
+UNICODE_CI   27535  27597  
89  0.0  275348.4  10.1X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

-UTF8_BINARY2808   2808 
  0  0.0   28082.3   1.0X
-UTF8_LCASE 5412   5413 
  1  0.0   54123.5   1.9X
-UNICODE   70755  70787 
 44  0.0  707553.4  25.2X
-UNICODE_CI57639  57669 
 43  0.0  576390.0  20.5X
+UTF8_BINARY4603   4618 
 22  0.0   46031.3   1.0X
+UTF8_LCASE 9510   9518 
 11  0.0   95097.7   2.1X
+UNICODE  135718 135786 
 97  0.0 1357176.2  29.5X
+UNICODE_CI   113715 113819 
148  0.0 1137145.8  24.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]

2024-11-15 Thread via GitHub


ericm-db commented on code in PR #48401:
URL: https://github.com/apache/spark/pull/48401#discussion_r1844412927


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -115,7 +119,7 @@ case class TransformWithStateExec(
* Fetching the columnFamilySchemas from the StatefulProcessorHandle
* after init is called.
*/
-  private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
+  def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
 val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas
 closeProcessorHandle()
 columnFamilySchemas

Review Comment:
   Actually, I don't think you can make it static - we need the the 
`statefulProcessor` that is passed into this particular instance of the 
`TransformWithStateExec` class.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50236][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_1156`: `COLUMN_NOT_DEFINED_IN_TABLE ` [spark]

2024-11-15 Thread via GitHub


MaxGekk closed pull request #48768: [SPARK-50236][SQL] Assign appropriate error 
condition for `_LEGACY_ERROR_TEMP_1156`: `COLUMN_NOT_DEFINED_IN_TABLE `
URL: https://github.com/apache/spark/pull/48768


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50236][SQL] Assign appropriate error condition for `_LEGACY_ERROR_TEMP_1156`: `COLUMN_NOT_DEFINED_IN_TABLE ` [spark]

2024-11-15 Thread via GitHub


MaxGekk commented on PR #48768:
URL: https://github.com/apache/spark/pull/48768#issuecomment-2479344627

   +1, LGTM. Merging to master.
   Thank you, @itholic.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50130][SQL][FOLLOWUP] Make Encoder generation lazy [spark]

2024-11-15 Thread via GitHub


ueshin commented on code in PR #48829:
URL: https://github.com/apache/spark/pull/48829#discussion_r1844320464


##
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##
@@ -95,13 +95,12 @@ private[sql] object Dataset {
   def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame =
 sparkSession.withActive {
   val qe = sparkSession.sessionState.executePlan(logicalPlan)
-  val encoder = if (qe.isLazyAnalysis) {
-RowEncoder.encoderFor(new StructType())
+  if (qe.isLazyAnalysis) {

Review Comment:
   The lazy encoder creation only happens when `qe.isLazyAnalysis`; otherwise 
create it right here as same as before to avoid changing the behavior. 
`Dataset` has a new default constructor to take a function for the lazy case, 
and the original default constructor is still there to keep the behavior.
   The suggested cleanup doesn't keep the original behavior for non-lazy case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]

2024-11-15 Thread via GitHub


brkyvz commented on code in PR #48401:
URL: https://github.com/apache/spark/pull/48401#discussion_r1844340161


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -2204,6 +2204,16 @@ object SQLConf {
   .intConf
   .createWithDefault(3)
 
+  val STREAMING_STATE_STORE_ENCODING_FORMAT =
+buildConf("spark.sql.streaming.stateStore.encodingFormat")
+  .doc("The encoding format used for stateful operators to store 
information " +
+"in the state store")
+  .version("4.0.0")
+  .stringConf
+  .checkValue(v => Set("UnsafeRow", "Avro").contains(v),
+"Valid values are 'UnsafeRow' and 'Avro'")

Review Comment:
   nit: do we want to be case insensitive here?



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -115,7 +119,7 @@ case class TransformWithStateExec(
* Fetching the columnFamilySchemas from the StatefulProcessorHandle
* after init is called.
*/
-  private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
+  def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
 val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas
 closeProcessorHandle()
 columnFamilySchemas

Review Comment:
   should this be moved to a static method?



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##
@@ -552,9 +570,9 @@ class IncrementalExecution(
   // The rule below doesn't change the plan but can cause the side effect 
that
   // metadata/schema is written in the checkpoint directory of stateful 
operator.

Review Comment:
   existing: oof, this is not great...



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##
@@ -552,9 +570,9 @@ class IncrementalExecution(
   // The rule below doesn't change the plan but can cause the side effect 
that
   // metadata/schema is written in the checkpoint directory of stateful 
operator.
   planWithStateOpId transform StateSchemaAndOperatorMetadataRule.rule
-
-  simulateWatermarkPropagation(planWithStateOpId)
-  planWithStateOpId transform WatermarkPropagationRule.rule
+  val planWithStateSchemas = planWithStateOpId transform 
StateStoreColumnFamilySchemasRule.rule
+  simulateWatermarkPropagation(planWithStateSchemas)
+  planWithStateSchemas transform WatermarkPropagationRule.rule

Review Comment:
   ditto



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##
@@ -259,6 +259,24 @@ class IncrementalExecution(
 }
   }
 
+  /**
+   * This rule populates the column family schemas for the 
TransformWithStateExec
+   * operator to ship them from the driver, where the schema and serializer 
objects
+   * are created, to the executor.
+   */
+  object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule {
+override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+  case statefulOp: StatefulOperator =>
+statefulOp match {
+  case op: TransformWithStateExec =>
+op.copy(
+  columnFamilySchemas = op.getColFamilySchemas()
+)

Review Comment:
   this is a bit confusing imho. If you have the `getColFamilySchemas` method 
as part of the class available, why do you have to set it on the class with a 
copy.
   
   Two possible suggestions:
   1. Make the `getColFamilySchemas` a static method. Not sure if that's 
possible though looking at the logic a bit more in TransformWithStateExec. It 
feels weird that you're opening and closing these handles just to get some of 
the information out.
   2. Add a comment here that this needs to be run on the Driver, and also 
instead rename the method to: `withColumnFamilySchemas` which calls copy 
internally.



##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateStoreColumnFamilySchemaUtils.scala:
##
@@ -71,6 +224,33 @@ object StateStoreColumnFamilySchemaUtils {
   stateName,
   keySchema,
   valSchema,
-  Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)))
+  Some(PrefixKeyScanStateEncoderSpec(keySchema, 1)),
+  avroEnc = getAvroSerde(
+StructType(keySchema.take(1)),
+valSchema,
+Some(StructType(keySchema.drop(1)))
+  ))
+  }
+
+  // This function creates the StateStoreColFamilySchema for
+  // Timers' secondary index.
+  // Because we want to encode fixed-length types as binary types
+  // if we are using Avro, we need to do some schema conversion to ensure
+  // we can use range scan
+  def getTimerStateSchemaForSecIndex(
+  stateName: String,
+  keySchema: StructType,
+  valSchema: StructType): StateStoreColFamilySchema = {
+val avroKeySchema = StateStoreColumnFamilySchemaUtils.
+  convertForRangeScan(keySchema, Seq(0))
+

[PR] [DRAFT] Two string types [spark]

2024-11-15 Thread via GitHub


stefankandic opened a new pull request, #48861:
URL: https://github.com/apache/spark/pull/48861

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49490][SQL] Add benchmarks for initCap [spark]

2024-11-15 Thread via GitHub


mrk-andreev commented on code in PR #48501:
URL: https://github.com/apache/spark/pull/48501#discussion_r1844404602


##
sql/core/benchmarks/CollationBenchmark-jdk21-results.txt:
##
@@ -1,54 +1,88 @@
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - equalsFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
--
-UTF8_BINARY  1353   1357   
5  0.1   13532.2   1.0X
-UTF8_LCASE   2601   2602   
2  0.0   26008.0   1.9X
-UNICODE 16745  16756   
   16  0.0  167450.9  12.4X
-UNICODE_CI  16590  16627   
   52  0.0  165904.8  12.3X
+UTF8_BINARY  2220   2223   
5  0.0   22197.0   1.0X
+UTF8_LCASE   4949   4950   
2  0.0   49488.1   2.2X
+UNICODE 28172  28198   
   36  0.0  281721.0  12.7X
+UNICODE_CI  28233  28308   
  106  0.0  282328.2  12.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - compareFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 
---
-UTF8_BINARY   1746   1746  
 0  0.1   17462.6   1.0X
-UTF8_LCASE2629   2630  
 1  0.0   26294.8   1.5X
-UNICODE  16744  16744  
 0  0.0  167438.6   9.6X
-UNICODE_CI   16518  16521  
 4  0.0  165180.2   9.5X
+UTF8_BINARY   2731   2733  
 2  0.0   27313.6   1.0X
+UTF8_LCASE4611   4619  
11  0.0   46111.4   1.7X
+UNICODE  28149  28211  
88  0.0  281486.8  10.3X
+UNICODE_CI   27535  27597  
89  0.0  275348.4  10.1X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - hashFunction:  Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

-UTF8_BINARY2808   2808 
  1  0.0   28076.2   1.0X
-UTF8_LCASE 5409   5410 
  0  0.0   54093.0   1.9X
-UNICODE   67930  67957 
 38  0.0  679296.7  24.2X
-UNICODE_CI56004  56005 
  1  0.0  560044.2  19.9X
+UTF8_BINARY4603   4618 
 22  0.0   46031.3   1.0X
+UTF8_LCASE 9510   9518 
 11  0.0   95097.7   2.1X
+UNICODE  135718 135786 
 97  0.0 1357176.2  29.5X
+UNICODE_CI   113715 113819 
148  0.0 1137145.8  24.7X
 
-OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.5.0-1025-azure
-AMD EPYC 7763 64-Core Processor
+OpenJDK 64-Bit Server VM 21.0.5+11-LTS on Linux 6.8.0-1017-aws
+Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz
 collation unit benchmarks - contains: Best Time(ms)   Avg Time(ms)   
Stdev(ms)Rate(M/s)   Per Row(ns) Relative time
 

[PR] [SPARK-50301][SS] Make TransformWithState metrics reflect their intuitive meanings [spark]

2024-11-15 Thread via GitHub


neilramaswamy opened a new pull request, #48862:
URL: https://github.com/apache/spark/pull/48862

   ### What changes were proposed in this pull request?
   
   These changes make the following changes to metrics in TWS:
   
   - `allUpdatesTimeMs` now captures the time it takes to process all the new 
data with the user's stateful processor.
   - `timerProcessingTimeMs` was added to capture the time it takes to process 
all the user's timers.
   - `allRemovalsTimeMs` now captures the time it takes to do TTL cleanup at 
the end of a micro-batch.
   - `commitTimeMs` now captures _only_ the time it takes to commit the state, 
not the TTL cleanup.
   
   With these metrics, a user can have a fairly clear picture of where time is 
being spent in a micro-batch that uses TWS:
   
   
   ### Why are the changes needed?
   
   The metrics today misrepresent what they're actually measuring.
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. Metrics for TWS are changing. However, since TWS is `private[sql]`, 
this shouldn't impact any real users.
   
   
   ### How was this patch tested?
   
   We don't have any way to test these metrics in _any_ stateful operator for 
streaming today.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-45592][SPARK-45282][SQL] Correctness issue in AQE with InMemoryTableScanExec [spark]

2024-11-15 Thread via GitHub


Tom-Newton commented on PR #43760:
URL: https://github.com/apache/spark/pull/43760#issuecomment-2479390482

   > The fix addresses the issue by disabling coalescing in InMemoryTableScan 
for shuffles in the final stage.
   
   This PR seems to indicate that there will be a correctness bug if we allow 
coalescing in the final stage. Is this still true after 
https://github.com/apache/spark/pull/43435 which seems like its fixing the same 
issue? I have not noticed any correctness issues in this area but I have got a 
usecase where this PR causes a major performance regression. 
   
   I notice that https://github.com/apache/spark/pull/45054 brings back the 
option to enable coalescing in InMemoryTableScan for shuffles in the final 
stage. If I do this my performance problem is resolved but will I be at risk of 
the correctness bug again?
   
   
 Details on my performance regression case
   
 If you configure `spark.sql.shuffle.partitions` or 
`spark.sql.adaptive.coalescePartitions.initialPartitionNum` to a large number, 
we currently use 8192. The following code ends up using 8192 partitions and is 
really slow as a result.
 ```python
   df = spark.createDataFrame(
   [
   {
   "group": "group0",
   "df1_column": 1,
   },
   {
   "group": "group0",
   "df1_column": 1,
   },
   ]
   )
   df = df.groupBy("group").agg(sf.max("df1_column"))
   df.cache()
   df.explain()
   df.show()
 ```
   With this PR: 40 seconds 
   Without this PR: 2 seconds
   
   Maybe its our mistake using 
`spark.sql.adaptive.coalescePartitions.initialPartitionNum: 8192` but I don't 
really see any generic way to configure this appropriately. My strategy has 
just been make it bigger than we would ever need and rely on AQE to coalesce to 
something sensible, but my understanding could be lacking. 
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]

2024-11-15 Thread via GitHub


ericm-db commented on code in PR #48401:
URL: https://github.com/apache/spark/pull/48401#discussion_r1844389128


##
connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDataToCatalyst.scala:
##
@@ -24,6 +24,7 @@ import org.apache.avro.generic.GenericDatumReader
 import org.apache.avro.io.{BinaryDecoder, DecoderFactory}
 
 import org.apache.spark.SparkException
+import org.apache.spark.sql.avro.SchemaConverters

Review Comment:
   No, because we changed the directory of the file, we had to add imports.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]

2024-11-15 Thread via GitHub


ericm-db commented on code in PR #48401:
URL: https://github.com/apache/spark/pull/48401#discussion_r1844390849


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TransformWithStateExec.scala:
##
@@ -115,7 +119,7 @@ case class TransformWithStateExec(
* Fetching the columnFamilySchemas from the StatefulProcessorHandle
* after init is called.
*/
-  private def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
+  def getColFamilySchemas(): Map[String, StateStoreColFamilySchema] = {
 val columnFamilySchemas = getDriverProcessorHandle().getColumnFamilySchemas
 closeProcessorHandle()
 columnFamilySchemas

Review Comment:
   Good question - I guess if we can pass the stateful processor in, it can be. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50017] Support Avro encoding for TransformWithState operator [spark]

2024-11-15 Thread via GitHub


ericm-db commented on code in PR #48401:
URL: https://github.com/apache/spark/pull/48401#discussion_r1844389823


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/IncrementalExecution.scala:
##
@@ -259,6 +259,24 @@ class IncrementalExecution(
 }
   }
 
+  /**
+   * This rule populates the column family schemas for the 
TransformWithStateExec
+   * operator to ship them from the driver, where the schema and serializer 
objects
+   * are created, to the executor.
+   */
+  object StateStoreColumnFamilySchemasRule extends SparkPlanPartialRule {
+override val rule: PartialFunction[SparkPlan, SparkPlan] = {
+  case statefulOp: StatefulOperator =>
+statefulOp match {
+  case op: TransformWithStateExec =>

Review Comment:
   Yeah, it will be.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50309][DOCS] Document `SQL Pipe` Syntax [spark]

2024-11-15 Thread via GitHub


dtenedor commented on code in PR #48852:
URL: https://github.com/apache/spark/pull/48852#discussion_r1844576182


##
docs/sql-pipe-syntax.md:
##
@@ -0,0 +1,540 @@
+---
+layout: global
+title: SQL Pipe Syntax
+displayTitle: SQL Pipe Syntax
+license: |
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+ 
+ http://www.apache.org/licenses/LICENSE-2.0
+ 
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+---
+
+### Syntax
+
+ Overview
+
+Apache Spark supports SQL pipe syntax which allows composing queries from 
combinations of operators.
+
+* Any query can have zero or more pipe operators as a suffix, delineated by 
the pipe character `|>`. 
+* Each pipe operator starts with one or more SQL keywords followed by its own 
grammar as described
+  in the table below.
+* Most of these operators reuse existing grammar for standard SQL clauses.
+* Operators can apply in any order, any number of times.
+
+`FROM ` is now a supported standalone query which behaves the same 
as
+`TABLE `. This provides a convenient starting place to begin a 
chained pipe SQL query,
+although it is possible to add one or more pipe operators to the end of any 
valid Spark SQL query
+with the same consistent behavior as written here.
+
+Please refer to the table at the end of this document for a full list of all 
supported operators
+and their semantics.
+
+ Example
+
+For example, this is query 13 from the TPC-H benchmark:
+
+```sql
+SELECT c_count, COUNT(*) AS custdist
+FROM
+  (SELECT c_custkey, COUNT(o_orderkey) c_count
+   FROM customer
+   LEFT OUTER JOIN orders ON c_custkey = o_custkey
+ AND o_comment NOT LIKE '%unusual%packages%' GROUP BY c_custkey
+  ) AS c_orders
+GROUP BY c_count
+ORDER BY custdist DESC, c_count DESC;
+```
+
+To write the same logic using SQL pipe operators, we express it like this:
+
+```sql
+FROM customer
+|> LEFT OUTER JOIN orders ON c_custkey = o_custkey
+   AND o_comment NOT LIKE '%unusual%packages%'
+|> AGGREGATE COUNT(o_orderkey) c_count
+   GROUP BY c_custkey
+|> AGGREGATE COUNT(*) AS custdist
+   GROUP BY c_count
+|> ORDER BY custdist DESC, c_count DESC;
+```
+
+ Source Tables
+
+To start a new query using SQL pipe syntax, use the `FROM ` or 
`TABLE `
+clause, which creates a relation comprising all rows from the source table. 
Then append one or more
+pipe operators to the end of this clause to perform further transformations.
+
+ Projections
+
+SQL pipe syntax supports composable ways to evaluate expressions. A major 
advantage of these
+projection features is that they support computing new expressions based on 
previous ones in an
+incremental way. No lateral column references are needed here since each 
operator applies
+independently on its input table, regardless of the order in which the 
operators appear. Each of
+these computed columns then becomes visible to use with the following operator.
+
+`SELECT` produces a new table by evaluating the provided expressions.
+It is possible to use `DISTINCT` and `*` as needed.
+This works like the outermost `SELECT` in a table subquery in regular Spark 
SQL.
+
+`EXTEND` adds new columns to the input table by evaluating the provided 
expressions.
+This also preserves table aliases.
+This works like `SELECT *, new_column` in regular Spark SQL.
+
+`DROP` removes columns from the input table.
+This is similar to `SELECT * EXCEPT (column)` in regular Spark SQL.
+
+`SET` replaces column values from the input table.
+This is similar to `SELECT * REPLACE (expression AS column)` in regular Spark 
SQL.
+
+`AS` forwards the input table and introduces a new alias for each row.
+
+ Aggregations
+
+In general, aggregation takes place differently using SQL pipe syntax as 
opposed to regular Spark
+SQL.
+
+To perform full-table aggregation, use the `AGGREGATE` operator with a list of 
aggregate
+expressions to evaluate. This returns one single row in the output table.
+
+To perform aggregation with grouping, use the `AGGREGATE` operator with a 
`GROUP BY` clause.
+This returns one row for each unique combination of values of the grouping 
expressions. The output
+table contains the evaluated grouping expressions followed by the evaluated 
aggregate functions.
+Grouping expressions support assigning aliases for purposes of referring to 
them in future
+operators. In this way, it is not necessary to rep

Re: [PR] [SPARK-49787][SQL] Cast between UDT and other types [spark]

2024-11-15 Thread via GitHub


cloud-fan commented on PR #48251:
URL: https://github.com/apache/spark/pull/48251#issuecomment-2480296031

   I don't think ANSI SQL allows casting between UDT and builtin types. Shall 
we use the `UnwrapUDT` to unwrap expressions that return UDT, in the store 
assignment rule `ResolveOutputRelation`?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-49787][SQL] Cast between UDT and other types [spark]

2024-11-15 Thread via GitHub


viirya commented on PR #48251:
URL: https://github.com/apache/spark/pull/48251#issuecomment-2480395470

   > I don't think ANSI SQL allows casting between UDT and builtin types. Shall 
we use the `UnwrapUDT` expression to wrap expressions that return UDT, in the 
store assignment rule `ResolveOutputRelation`?
   
   Sounds good.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >