Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #46873:
URL: https://github.com/apache/spark/pull/46873#issuecomment-2230167466

   Thank you for pinging me, @LuciferYang .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48890][CORE][SS] Add Structured Streaming related fields to log4j ThreadContext [spark]

2024-07-16 Thread via GitHub


WweiL commented on code in PR #47340:
URL: https://github.com/apache/spark/pull/47340#discussion_r1678863246


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala:
##
@@ -287,6 +290,11 @@ abstract class StreamExecution(
   sparkSession.sparkContext.setJobGroup(runId.toString, 
getBatchDescriptionString,
 interruptOnCancel = true)
   sparkSession.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, 
id.toString)
+  loggingThreadContext = CloseableThreadContext.putAll(
+Map(
+  StreamExecution.QUERY_ID_KEY -> id.toString,

Review Comment:
   oh yes I was using that, forgot to push update to github!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48752][PYTHON][CONNECT][DOCS] Introduce `pyspark.logger` for improved structured logging for PySpark [spark]

2024-07-16 Thread via GitHub


itholic commented on code in PR #47145:
URL: https://github.com/apache/spark/pull/47145#discussion_r1678866930


##
python/docs/source/development/logger.rst:
##
@@ -0,0 +1,151 @@
+..  Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+..http://www.apache.org/licenses/LICENSE-2.0
+
+..  Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+
+==
+Logging in PySpark
+==
+
+.. currentmodule:: pyspark.logger
+
+Introduction
+
+
+The :ref:`pyspark.logger` module facilitates 
structured client-side logging for PySpark users.
+
+This module includes a :class:`PySparkLogger` class that provides several 
methods for logging messages at different levels in a structured JSON format:
+
+- :meth:`PySparkLogger.log_info`
+- :meth:`PySparkLogger.log_warn`
+- :meth:`PySparkLogger.log_error`
+
+The logger can be easily configured to write logs to either the console or a 
specified file.
+
+Customizing Log Format
+==
+The default log format is JSON, which includes the timestamp, log level, 
logger name, and the log message along with any additional context provided.
+
+Example log entry:
+
+.. code-block:: python
+
+{
+  "ts": "2024-06-28T10:53:48.528Z",
+  "level": "ERROR",
+  "logger": "DataFrameQueryContextLogger",
+  "msg": "[DIVIDE_BY_ZERO] Division by zero.",
+  "context": {
+"file": "/path/to/file.py",

Review Comment:
   @gengliangwang Just updated PR. Now the log will include `exception` field 
as below when error occurs:
   
   ```json
   {
 "ts": "2024-06-28 19:53:48,563",
 "level": "ERROR",
 "logger": "DataFrameQueryContextLogger",
 "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate 
divisor being 0 and return NULL instead. If necessary set 
\"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 
22012\n== DataFrame ==\n\"__truediv__\" was called 
from\n/.../spark/python/test_error_context.py:17\n", 
 "context": {
   "file": "/.../spark/python/test_error_context.py",
   "line_no": "17",
   "fragment": "__truediv__"
   "error_class": "DIVIDE_BY_ZERO"
 },
 "exception": {
   "class": "Py4JJavaError",
   "msg": "An error occurred while calling o52.showString.\n: 
org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. 
Use `try_divide` to tolerate divisor being 0 and return NULL instead. If 
necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. 
SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called 
from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat
 
org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat
 
org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat
 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown
 Source)\n\tat 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown
 Source)\n\tat 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n
 \tat 
org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat
 
org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat
 org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat 
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat 
org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat 
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat 
org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat
 org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkE
 rrorUtils.scala:64)\n\tat 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFin

Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on code in PR #43503:
URL: https://github.com/apache/spark/pull/43503#discussion_r1678869244


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##
@@ -16,164 +16,201 @@
  */
 package org.apache.spark.sql.catalyst.xml
 
+import java.io.Writer
 import java.sql.Timestamp
-import javax.xml.stream.XMLStreamWriter
+import javax.xml.stream.XMLOutputFactory
 
 import scala.collection.Map
 
+import com.sun.xml.txw2.output.IndentingXMLStreamWriter

Review Comment:
   Thank you for answering, @HyukjinKwon . Let me file a JIRA in order not to 
forget this. I guess we need a careful look on this kind of part before Apache 
Spark 4.0.0 release.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on code in PR #46873:
URL: https://github.com/apache/spark/pull/46873#discussion_r1678871813


##
core/src/main/scala/org/apache/spark/util/Utils.scala:
##
@@ -3072,15 +3072,14 @@ private[spark] object Utils
*/
   lazy val isG1GC: Boolean = {
 Try {
+  // SPARK-48505: If the initialization probe of `HotSpotDiagnosticMXBean` 
is successful,
+  // the API of `HotSpotDiagnosticMXBean` can be used directly in 
subsequent operations,
+  // instead of invoking it through reflection.
   val clazz = 
Utils.classForName("com.sun.management.HotSpotDiagnosticMXBean")
-.asInstanceOf[Class[_ <: PlatformManagedObject]]
-  val vmOptionClazz = Utils.classForName("com.sun.management.VMOption")
+  import com.sun.management.HotSpotDiagnosticMXBean

Review Comment:
   Although the code becomes simpler, I'm not sure this is a better recommended 
style: `import com.sun.*` in our source code.
   
   There is a similar discussion here.
   - https://github.com/apache/spark/pull/43503#discussion_r1671300283
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]

2024-07-16 Thread via GitHub


chenhao-db commented on code in PR #47310:
URL: https://github.com/apache/spark/pull/47310#discussion_r1678874248


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -4367,6 +4367,14 @@ object SQLConf {
   .booleanConf
   .createWithDefault(true)
 
+  val JSON_USE_UNSAFE_ROW =
+buildConf("spark.sql.json.useUnsafeRow")
+  .internal()
+  .doc("When set to true, use UnsafeRow to represent struct result in the 
JSON parser.")
+  .version("4.0.0")
+  .booleanConf
+  .createWithDefault(false)

Review Comment:
   I feel it is better to not enable it by default. The benchmark has shown 
that applying the change can slow down the JSON scan a bit. If there is enough 
memory, then saving memory doesn't have enough benefit.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]

2024-07-16 Thread via GitHub


chenhao-db commented on PR #47310:
URL: https://github.com/apache/spark/pull/47310#issuecomment-2230185352

   @HyukjinKwon thanks! could you help merge it? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #46873:
URL: https://github.com/apache/spark/pull/46873#issuecomment-2230191426

   BTW, I saw the first attempt here and the comment. 
   - #46783
   
   IIUC, he said he is okay with the old code.
   > I'd say I'm okay with the old reflection-based version for its preciseness.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][TESTS] Remove unused test jar (udf_noA.jar) [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #47309:
URL: https://github.com/apache/spark/pull/47309#issuecomment-2230213227

   😄  Ya, this is the 7th instance.
   - https://github.com/apache/spark/pulls?q=is%3Apr+is%3Aclosed+is%3Amerged
   
   Given that the last one was last year, this is an annual event.
   - #43188
   
   Do you think we need to prevent this kind of mistake by updating our 
committer docs or changing the following rule?
   
   
https://github.com/apache/spark/blob/3923d7f8fcafb86fcbffcedf4215a314fe5b4011/.asf.yaml#L30-L33


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307][SQL][3.5] Add a config to optionally chunk base64 strings [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #47325:
URL: https://github.com/apache/spark/pull/47325#issuecomment-2230214873

   Thank you, @wForget and @yaooqinn !


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]

2024-07-16 Thread via GitHub


LuciferYang commented on code in PR #46873:
URL: https://github.com/apache/spark/pull/46873#discussion_r1678908450


##
core/src/main/scala/org/apache/spark/util/Utils.scala:
##
@@ -3072,15 +3072,14 @@ private[spark] object Utils
*/
   lazy val isG1GC: Boolean = {
 Try {
+  // SPARK-48505: If the initialization probe of `HotSpotDiagnosticMXBean` 
is successful,
+  // the API of `HotSpotDiagnosticMXBean` can be used directly in 
subsequent operations,
+  // instead of invoking it through reflection.
   val clazz = 
Utils.classForName("com.sun.management.HotSpotDiagnosticMXBean")
-.asInstanceOf[Class[_ <: PlatformManagedObject]]
-  val vmOptionClazz = Utils.classForName("com.sun.management.VMOption")
+  import com.sun.management.HotSpotDiagnosticMXBean

Review Comment:
   Thanks @dongjoon-hyun ,understood, that makes sense. Let's keep things as 
they are, I will close this pr.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]

2024-07-16 Thread via GitHub


LuciferYang commented on code in PR #46873:
URL: https://github.com/apache/spark/pull/46873#discussion_r1678908450


##
core/src/main/scala/org/apache/spark/util/Utils.scala:
##
@@ -3072,15 +3072,14 @@ private[spark] object Utils
*/
   lazy val isG1GC: Boolean = {
 Try {
+  // SPARK-48505: If the initialization probe of `HotSpotDiagnosticMXBean` 
is successful,
+  // the API of `HotSpotDiagnosticMXBean` can be used directly in 
subsequent operations,
+  // instead of invoking it through reflection.
   val clazz = 
Utils.classForName("com.sun.management.HotSpotDiagnosticMXBean")
-.asInstanceOf[Class[_ <: PlatformManagedObject]]
-  val vmOptionClazz = Utils.classForName("com.sun.management.VMOption")
+  import com.sun.management.HotSpotDiagnosticMXBean

Review Comment:
   Thanks @dongjoon-hyun ,understood, that makes sense. Let's keep things as 
they are; I will close this pr.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]

2024-07-16 Thread via GitHub


LuciferYang closed pull request #46873: [SPARK-48505][CORE] Simplify the 
implementation of `Utils#isG1GC`
URL: https://github.com/apache/spark/pull/46873


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][TESTS] Remove unused test jar (udf_noA.jar) [spark]

2024-07-16 Thread via GitHub


grundprinzip commented on PR #47309:
URL: https://github.com/apache/spark/pull/47309#issuecomment-2230264319

   Conceptually yes, but I wanted to spend some time understanding the delta 
between the merge button and the script and how GH might have changed in 
between. One idea would be to use a GH action for the script as well that 
triggers by a keyword for example so that one does not always need to push 
directly from a local machine.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [MINOR][TESTS] Remove unused test jar (udf_noA.jar) [spark]

2024-07-16 Thread via GitHub


HyukjinKwon commented on PR #47309:
URL: https://github.com/apache/spark/pull/47309#issuecomment-2230287146

   Yeah I think we should block it merge button


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]

2024-07-16 Thread via GitHub


HyukjinKwon commented on code in PR #43503:
URL: https://github.com/apache/spark/pull/43503#discussion_r1678954714


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##
@@ -16,164 +16,201 @@
  */
 package org.apache.spark.sql.catalyst.xml
 
+import java.io.Writer
 import java.sql.Timestamp
-import javax.xml.stream.XMLStreamWriter
+import javax.xml.stream.XMLOutputFactory
 
 import scala.collection.Map
 
+import com.sun.xml.txw2.output.IndentingXMLStreamWriter

Review Comment:
   @sandip-db would you mind taking a look and followup when you find some time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]

2024-07-16 Thread via GitHub


HyukjinKwon commented on PR #47310:
URL: https://github.com/apache/spark/pull/47310#issuecomment-2230290146

   Merged to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]

2024-07-16 Thread via GitHub


HyukjinKwon closed pull request #47310: [SPARK-48873][SQL] Use UnsafeRow in 
JSON parser.
URL: https://github.com/apache/spark/pull/47310


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]

2024-07-16 Thread via GitHub


LuciferYang commented on PR #47310:
URL: https://github.com/apache/spark/pull/47310#issuecomment-2230296253

   Sorry, forgot to review this one. Late LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48865][SQL] Add try_url_decode function [spark]

2024-07-16 Thread via GitHub


yaooqinn commented on code in PR #47294:
URL: https://github.com/apache/spark/pull/47294#discussion_r1678963389


##
sql/core/src/test/resources/sql-tests/inputs/url-functions.sql:
##
@@ -17,4 +17,10 @@ select url_encode(null);
 select url_decode('https%3A%2F%2Fspark.apache.org');
 select url_decode('http%3A%2F%2spark.apache.org');
 select url_decode('inva lid://user:pass@host/file\\;param?query\\;p2');
-select url_decode(null);
\ No newline at end of file
+select url_decode(null);
+
+-- try_url_decode function
+select try_url_decode('https%3A%2F%2Fspark.apache.org');
+select try_url_decode('http%3A%2F%2spark.apache.org');
+select try_url_decode('inva lid://user:pass@host/file\\;param?query\\;p2');
+select try_url_decode(null);

Review Comment:
   ```suggestion
   select try_url_decode(null);
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]

2024-07-16 Thread via GitHub


pan3793 commented on code in PR #47349:
URL: https://github.com/apache/spark/pull/47349#discussion_r1678971316


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -641,14 +641,19 @@ object SQLConf {
 .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must 
be positive.")
 .createOptional
 
+  // Lazily get the number of cores to make sure SparkContext created first.
+  private lazy val defaultShufflePartition: Option[Int] = 
SparkContext.getActive.flatMap { sc =>
+if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None

Review Comment:
   maybe numDriverCores * 2 or 3? in production practice, we usually expect a 
larger(2 or 3 times of available exec cores) parallelism of the stage to tackle 
uneven task runtimes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48907][SQL] Fix the value `explicitTypes` in `COLLATION_MISMATCH.EXPLICIT` [spark]

2024-07-16 Thread via GitHub


panbingkun opened a new pull request, #47365:
URL: https://github.com/apache/spark/pull/47365

   
   
   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48865][SQL] Add try_url_decode function [spark]

2024-07-16 Thread via GitHub


yaooqinn commented on PR #47294:
URL: https://github.com/apache/spark/pull/47294#issuecomment-2230342454

   Also cc @cloud-fan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala [spark]

2024-07-16 Thread via GitHub


cloud-fan commented on code in PR #47301:
URL: https://github.com/apache/spark/pull/47301#discussion_r1679019564


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##
@@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: 
Dataset[T]) {
 this
   }
 
+  /**
+   * Clusters the output by the given columns on the file system. The rows 
with matching values in

Review Comment:
   let's be a bit more general as data sources are not always based on file 
system. How about `... given columns on the storage.`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala [spark]

2024-07-16 Thread via GitHub


cloud-fan commented on code in PR #47301:
URL: https://github.com/apache/spark/pull/47301#discussion_r1679022463


##
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala:
##
@@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: 
Dataset[T]) {
 this
   }
 
+  /**
+   * Clusters the output by the given columns on the file system. The rows 
with matching values in
+   * the specified clustering columns will be consolidated within the same 
file.

Review Comment:
   ditto, `... will be consolidated within the same group.`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala [spark]

2024-07-16 Thread via GitHub


cloud-fan commented on code in PR #47301:
URL: https://github.com/apache/spark/pull/47301#discussion_r1679027390


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala:
##
@@ -209,10 +209,25 @@ object ClusterBySpec {
   normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson
   }
 
+  /**
+   * Converts a ClusterBySpec to a map of table properties used to store the 
clustering
+   * information in the table catalog.
+   *
+   * @param clusterBySpec : existing ClusterBySpec to be converted to 
properties.
+   */
+  def toProperties(clusterBySpec: ClusterBySpec): Map[String, String] = {

Review Comment:
   what's the difference between this and `toProperty`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala [spark]

2024-07-16 Thread via GitHub


cloud-fan commented on code in PR #47301:
URL: https://github.com/apache/spark/pull/47301#discussion_r1679031615


##
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala:
##
@@ -104,9 +106,27 @@ final class DataFrameWriterV2[T] private[sql](table: 
String, ds: Dataset[T])
 }
 
 this.partitioning = Some(asTransforms)
+validatePartitioning()
+this
+  }
+
+  @scala.annotation.varargs
+  override def clusterBy(colName: String, colNames: String*): 
CreateTableWriter[T] = {
+this.clustering =
+  Some(ClusterByTransform((colName +: colNames).map(col => 
FieldReference(col
+validatePartitioning()
 this
   }
 
+  /**
+   * Validate that clusterBy is not used with partitionBy.
+   */
+  private def validatePartitioning(): Unit = {
+if (partitioning.nonEmpty && clustering.nonEmpty) {

Review Comment:
   `DataFrameWriterV2` does not have `bucketBy`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48907][SQL] Fix the value `explicitTypes` in `COLLATION_MISMATCH.EXPLICIT` [spark]

2024-07-16 Thread via GitHub


panbingkun commented on PR #47365:
URL: https://github.com/apache/spark/pull/47365#issuecomment-2230411842

   cc @mihailom-db @cloud-fan @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48907][SQL] Fix the value `explicitTypes` in `COLLATION_MISMATCH.EXPLICIT` [spark]

2024-07-16 Thread via GitHub


panbingkun commented on code in PR #47365:
URL: https://github.com/apache/spark/pull/47365#discussion_r1679059383


##
sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala:
##
@@ -3675,7 +3675,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
 new AnalysisException(
   errorClass = "COLLATION_MISMATCH.EXPLICIT",
   messageParameters = Map(
-"explicitTypes" -> toSQLId(explicitTypes)
+"explicitTypes" -> explicitTypes.map(toSQLId).mkString(", ")

Review Comment:
   fix it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata [spark]

2024-07-16 Thread via GitHub


HyukjinKwon commented on PR #47347:
URL: https://github.com/apache/spark/pull/47347#issuecomment-2230473641

   Sure, I will separate the PR. Thanks for reviewing this closely 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48909][ML][MLlib] Uses SparkSession over SparkContext when writing metadata [spark]

2024-07-16 Thread via GitHub


HyukjinKwon opened a new pull request, #47366:
URL: https://github.com/apache/spark/pull/47366

   ### What changes were proposed in this pull request?
   
   This PR proposes to use SparkSession over SparkContext when writing metadata
   
   ### Why are the changes needed?
   
   See https://github.com/apache/spark/pull/47347#issuecomment-2229701812
   
   ### Does this PR introduce _any_ user-facing change?
   
   No.
   
   ### How was this patch tested?
   
   Existing tests should cover it.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48763][CONNECT][BUILD] Move connect server and common to builtin module [spark]

2024-07-16 Thread via GitHub


bjornjorgensen commented on PR #47157:
URL: https://github.com/apache/spark/pull/47157#issuecomment-2230502432

   oh.. my fault there was something else that did not work as intended on my 
build system. 
   @HyukjinKwon thanks for double checking.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48910] Use HashSet to avoid linear searches in PreprocessTableCreation [spark]

2024-07-16 Thread via GitHub


vladimirg-db opened a new pull request, #47367:
URL: https://github.com/apache/spark/pull/47367

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]

2024-07-16 Thread via GitHub


HyukjinKwon commented on code in PR #47349:
URL: https://github.com/apache/spark/pull/47349#discussion_r1679158703


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -641,14 +641,19 @@ object SQLConf {
 .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must 
be positive.")
 .createOptional
 
+  // Lazily get the number of cores to make sure SparkContext created first.
+  private lazy val defaultShufflePartition: Option[Int] = 
SparkContext.getActive.flatMap { sc =>
+if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None

Review Comment:
   This is only for `local[...]` so I think more common cases are just trying 
small data out in their local file system -well maybe this isn't the case also. 
I will need to run some benchmark though.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata [spark]

2024-07-16 Thread via GitHub


HyukjinKwon commented on PR #47347:
URL: https://github.com/apache/spark/pull/47347#issuecomment-2230562169

   https://github.com/apache/spark/pull/47366 👍 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]

2024-07-16 Thread via GitHub


HyukjinKwon commented on code in PR #47349:
URL: https://github.com/apache/spark/pull/47349#discussion_r1679158703


##
sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala:
##
@@ -641,14 +641,19 @@ object SQLConf {
 .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must 
be positive.")
 .createOptional
 
+  // Lazily get the number of cores to make sure SparkContext created first.
+  private lazy val defaultShufflePartition: Option[Int] = 
SparkContext.getActive.flatMap { sc =>
+if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None

Review Comment:
   This is only for `local[...]` so I think more common cases are just trying 
small data out in their local file system -well maybe this isn't the case also. 
I will need to run some benchmark.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48510] Fix for UDAF `toColumn` API when running tests in Maevn [spark]

2024-07-16 Thread via GitHub


xupefei opened a new pull request, #47368:
URL: https://github.com/apache/spark/pull/47368

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [ONLY TEST][SQL] Improve TPCDSCollationQueryTestSuite [spark]

2024-07-16 Thread via GitHub


panbingkun opened a new pull request, #47369:
URL: https://github.com/apache/spark/pull/47369

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-36680][SQL][FOLLOWUP] Files with options should be put into resolveDataSource function [spark]

2024-07-16 Thread via GitHub


logze opened a new pull request, #47370:
URL: https://github.com/apache/spark/pull/47370

   
   
   ### What changes were proposed in this pull request?
   
   When reading csv, json and other files, pass the options parameter to the 
rules.resolveDataSource method to make the options parameter effective.
   
   This is a bug fix for [#46707](https://github.com/apache/spark/pull/46707)
   
   ### Why are the changes needed?
   
   For the following SQL, the options parameter passed in does not take effect. 
This is because the rules.resolve DataSource method does not pass the options 
parameter during the datasource construction process
   `SELECT * FROM csv.`/test/data.csv` WITH (`header` = true, 'delimiter' = 
'|')`
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   Unit test in SQLQuerySuite
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SC-170296] GROUP BY with MapType nested inside complex type [spark]

2024-07-16 Thread via GitHub


nebojsa-db commented on code in PR #47331:
URL: https://github.com/apache/spark/pull/47331#discussion_r1679332626


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala:
##
@@ -892,132 +892,108 @@ case class MapFromEntries(child: Expression)
 copy(child = newChild)
 }
 
+// Sorts all MapType expressions based on the ordering of their keys.
+// This is used when GROUP BY is done with a MapType (possibly nested) column.
 case class MapSort(base: Expression)
-  extends UnaryExpression with NullIntolerant with QueryErrorsBase {
+  extends UnaryExpression with NullIntolerant with QueryErrorsBase with 
CodegenFallback {
 
-  val keyType: DataType = base.dataType.asInstanceOf[MapType].keyType
-  val valueType: DataType = base.dataType.asInstanceOf[MapType].valueType
+  override lazy val canonicalized: Expression = base.canonicalized
+
+  override lazy val deterministic: Boolean = base.deterministic
 
   override def child: Expression = base
 
   override def dataType: DataType = base.dataType
 
-  override def checkInputDataTypes(): TypeCheckResult = base.dataType match {
-case m: MapType if RowOrdering.isOrderable(m.keyType) =>
-TypeCheckResult.TypeCheckSuccess
+  def recursiveCheckDataTypes(dataType: DataType): TypeCheckResult = dataType 
match {
+case a: ArrayType => recursiveCheckDataTypes(a.elementType)
+case StructType(fields) =>
+  fields.collect(sf => 
recursiveCheckDataTypes(sf.dataType)).filter(_.isFailure).headOption
+.getOrElse(TypeCheckResult.TypeCheckSuccess)
+case m: MapType if RowOrdering.isOrderable(m.keyType) => 
TypeCheckResult.TypeCheckSuccess
 case _: MapType =>
   DataTypeMismatch(
 errorSubClass = "INVALID_ORDERING_TYPE",
 messageParameters = Map(
   "functionName" -> toSQLId(prettyName),
-  "dataType" -> toSQLType(base.dataType)
+  "dataType" -> toSQLType(dataType)
 )
   )
-case _ =>
-  DataTypeMismatch(
+case _ => TypeCheckResult.TypeCheckSuccess
+  }
+
+  override def checkInputDataTypes(): TypeCheckResult = {
+if (!dataType.existsRecursively(_.isInstanceOf[MapType])) {
+  return DataTypeMismatch(
 errorSubClass = "UNEXPECTED_INPUT_TYPE",
 messageParameters = Map(
   "paramIndex" -> ordinalNumber(0),
   "requiredType" -> toSQLType(MapType),
   "inputSql" -> toSQLExpr(base),
   "inputType" -> toSQLType(base.dataType))
   )
-  }
-
-  override def nullSafeEval(array: Any): Any = {
-// put keys and their respective values inside a tuple and sort them
-// according to the key ordering. Extract the new sorted k/v pairs to form 
a sorted map
-
-val mapData = array.asInstanceOf[MapData]
-val numElements = mapData.numElements()
-val keys = mapData.keyArray()
-val values = mapData.valueArray()
-
-val ordering = PhysicalDataType.ordering(keyType)
-
-val sortedMap = Array
-  .tabulate(numElements)(i => (keys.get(i, keyType).asInstanceOf[Any],
-values.get(i, valueType).asInstanceOf[Any]))
-  .sortBy(_._1)(ordering)
-
-new ArrayBasedMapData(new GenericArrayData(sortedMap.map(_._1)),
-  new GenericArrayData(sortedMap.map(_._2)))
-  }
-
-  override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
-nullSafeCodeGen(ctx, ev, b => sortCodegen(ctx, ev, b))
-  }
-
-  private def sortCodegen(ctx: CodegenContext, ev: ExprCode,
-  base: String): String = {
-
-val arrayBasedMapData = classOf[ArrayBasedMapData].getName
-val genericArrayData = classOf[GenericArrayData].getName
-
-val numElements = ctx.freshName("numElements")
-val keys = ctx.freshName("keys")
-val values = ctx.freshName("values")
-val sortArray = ctx.freshName("sortArray")
-val i = ctx.freshName("i")
-val o1 = ctx.freshName("o1")
-val o1entry = ctx.freshName("o1entry")
-val o2 = ctx.freshName("o2")
-val o2entry = ctx.freshName("o2entry")
-val c = ctx.freshName("c")
-val newKeys = ctx.freshName("newKeys")
-val newValues = ctx.freshName("newValues")
-
-val boxedKeyType = CodeGenerator.boxedType(keyType)
-val boxedValueType = CodeGenerator.boxedType(valueType)
-val javaKeyType = CodeGenerator.javaType(keyType)
+}
 
-val simpleEntryType = s"java.util.AbstractMap.SimpleEntry<$boxedKeyType, 
$boxedValueType>"
+if (dataType.existsRecursively(dt =>
+  dt.isInstanceOf[MapType] && 
!RowOrdering.isOrderable(dt.asInstanceOf[MapType].keyType))) {
+  DataTypeMismatch(
+errorSubClass = "INVALID_ORDERING_TYPE",
+messageParameters = Map(
+  "functionName" -> toSQLId(prettyName),
+  "dataType" -> toSQLType(dataType)
+)
+  )
+}
 
-val comp = if (CodeGenerator.isPrimitiveType(keyType)) {
-  val v1 = ctx.freshName("v1")
-  val v2 = ctx.freshName("v2")
-  s"""
- |$javaKeyType $v1 = (($boxedKeyType) $o1).

[PR] [SPARK-47307][DOCS][FOLLOWUP] Add a migration guide for the behavior change of base64 function [spark]

2024-07-16 Thread via GitHub


wForget opened a new pull request, #47371:
URL: https://github.com/apache/spark/pull/47371

   
   
   ### What changes were proposed in this pull request?
   
   Follow up to #47303
   
   Add a migration guide for the behavior change of `base64` function
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   No
   
   ### How was this patch tested?
   
   doc change
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307][DOCS][FOLLOWUP] Add a migration guide for the behavior change of base64 function [spark]

2024-07-16 Thread via GitHub


wForget commented on PR #47371:
URL: https://github.com/apache/spark/pull/47371#issuecomment-2230858686

   cc @yaooqinn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48628][CORE] Add task peak on/off heap memory metrics [spark]

2024-07-16 Thread via GitHub


Ngone51 commented on code in PR #47192:
URL: https://github.com/apache/spark/pull/47192#discussion_r1679431752


##
core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java:
##
@@ -202,6 +226,18 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer requestingConsu
 logger.debug("Task {} acquired {} for {}", taskAttemptId, 
Utils.bytesToString(got),
   requestingConsumer);
   }
+
+  if (mode == MemoryMode.OFF_HEAP) {
+synchronized (offHeapMemoryLock) {

Review Comment:
   The whole function `acquireExecutionMemory` is under the protection of 
`synchronized (this)`, and the release memory can be got from 
`trySpillAndAcquire()`:
   
   ```scala
   long released = consumerToSpill.spill(requested, requestingConsumer);
   ```
   
So I don't think we need extra lock here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307][DOCS][FOLLOWUP] Add a migration guide for the behavior change of base64 function [spark]

2024-07-16 Thread via GitHub


pan3793 commented on code in PR #47371:
URL: https://github.com/apache/spark/pull/47371#discussion_r1679440173


##
docs/sql-migration-guide.md:
##
@@ -64,6 +64,7 @@ license: |
 ## Upgrading from Spark SQL 3.5.1 to 3.5.2
 
 - Since 3.5.2, MySQL JDBC datasource will read TINYINT UNSIGNED as ShortType, 
while in 3.5.1, it was wrongly read as ByteType.
+- Since 3.5.2, the `base64` function will return a non-chunked string. To 
restore the previous behavior where base64 encoded strings that chunked into 
lines of at most 76 characters, set 
`spark.sql.legacy.chunkBase64String.enabled` to `true`.

Review Comment:
   the behavior changed in spark 3.3.0, so "the previous behavior" might be 
ambiguous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]

2024-07-16 Thread via GitHub


sandip-db commented on code in PR #43503:
URL: https://github.com/apache/spark/pull/43503#discussion_r1679453180


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##
@@ -16,164 +16,201 @@
  */
 package org.apache.spark.sql.catalyst.xml
 
+import java.io.Writer
 import java.sql.Timestamp
-import javax.xml.stream.XMLStreamWriter
+import javax.xml.stream.XMLOutputFactory
 
 import scala.collection.Map
 
+import com.sun.xml.txw2.output.IndentingXMLStreamWriter

Review Comment:
   Ack. I didn't find a drop-in replacement for IndentingXMLStreamWriter. I can 
look into implementing our own version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48388][SQL] Fix SET statement behavior for SQL Scripts [spark]

2024-07-16 Thread via GitHub


cloud-fan commented on code in PR #47272:
URL: https://github.com/apache/spark/pull/47272#discussion_r1679508361


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -61,11 +61,18 @@ compoundBody
 
 compoundStatement
 : statement
+| compoundSetStatement
 | beginEndCompoundBlock
 ;
 
+compoundSetStatement

Review Comment:
   why do we call it `compound`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48388][SQL] Fix SET statement behavior for SQL Scripts [spark]

2024-07-16 Thread via GitHub


cloud-fan commented on code in PR #47272:
URL: https://github.com/apache/spark/pull/47272#discussion_r1679511481


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -251,26 +258,29 @@ statement
 | (MSCK)? REPAIR TABLE identifierReference
 (option=(ADD|DROP|SYNC) PARTITIONS)?   
#repairTable
 | op=(ADD | LIST) identifier .*?   
#manageResource
-| SET COLLATION collationName=identifier   
#setCollation
-| SET ROLE .*? 
#failNativeCommand
+| CREATE INDEX (IF errorCapturingNot EXISTS)? identifier ON TABLE?
+identifierReference (USING indexType=identifier)?
+LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
+(OPTIONS options=propertyList)?
#createIndex
+| DROP INDEX (IF EXISTS)? identifier ON TABLE? identifierReference 
#dropIndex
+| unsupportedHiveNativeCommands .*?
#failNativeCommand
+;
+
+setResetStatement
+: SET COLLATION collationName=identifier   
#setCollation
+| SET ROLE .*? 
#failSetRole
 | SET TIME ZONE interval   
#setTimeZone
 | SET TIME ZONE timezone   
#setTimeZone
 | SET TIME ZONE .*?
#setTimeZone
 | SET (VARIABLE | VAR) assignmentList  
#setVariable
 | SET (VARIABLE | VAR) LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
-  LEFT_PAREN query RIGHT_PAREN 
#setVariable
+LEFT_PAREN query RIGHT_PAREN   
#setVariable

Review Comment:
   can we avoid duplicating the SET VAR parser rule?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48861][SQL] Enable shuffle file removal/skipMigration for all SQL executions [spark]

2024-07-16 Thread via GitHub


abellina commented on PR #47360:
URL: https://github.com/apache/spark/pull/47360#issuecomment-2231084434

   @bozhang2820 @cloud-fan fyi


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48791][CORE][3.5] Fix perf regression caused by the accumulators registration overhead using CopyOnWriteArrayList [spark]

2024-07-16 Thread via GitHub


LuciferYang commented on PR #47297:
URL: https://github.com/apache/spark/pull/47297#issuecomment-2231142709

   > `Scala 2.13 build with SBT` failed:
   > 
   > ```
   > [error] 
/home/runner/work/spark/spark/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala:117:34:
 Symbol 'type scala.collection.compat.package.IterableOnce' is missing from the 
classpath.
   > [error] This symbol is required by 'type 
breeze.linalg.support.LowPrioCanTraverseValues.X'.
   > [error] Make sure that type IterableOnce is in your classpath and check 
for conflicting dependencies with `-Ylog-classpath`.
   > [error] A full rebuild may help if 'LowPrioCanTraverseValues.class' was 
compiled against an incompatible version of scala.collection.compat.package.
   > [error] val tol = Utils.EPSILON * max(d) * d.length
   > [error]  ^
   > [error] one error found
   > ```
   > 
   > @LuciferYang Do you have any idea why `mllib` module doesnt't have the 
depdency of `scala-collection-compat` since we have already directly managed 
that dependecy in `core` module?
   > 
   > ```
   > [INFO] --- dependency:3.6.0:tree (default-cli) @ spark-mllib_2.13 ---
   > [INFO] org.apache.spark:spark-mllib_2.13:jar:3.5.2-SNAPSHOT
   > [INFO] +- 
org.scala-lang.modules:scala-parser-combinators_2.13:jar:2.3.0:compile
   > [INFO] +- org.apache.spark:spark-core_2.13:jar:3.5.2-SNAPSHOT:compile
   > [INFO] |  \- org.scala-lang.modules:scala-xml_2.13:jar:2.1.0:compile
   > [INFO] \- 
org.scala-lang.modules:scala-parallel-collections_2.13:jar:1.0.4:compile
   > ```
   
   
   I reset to 45286ca locally, then ran `build/sbt "mllib/dependencyTree" 
-Pscala-2.13`. In the dependency tree, I can see `[info]   | 
+-org.scala-lang.modules:scala-collection-compat_2.13:2.7.0 [S]`, so I still 
have no clue about this issue.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-47307][DOCS][FOLLOWUP] Add a migration guide for the behavior change of base64 function [spark]

2024-07-16 Thread via GitHub


wForget commented on code in PR #47371:
URL: https://github.com/apache/spark/pull/47371#discussion_r1679591679


##
docs/sql-migration-guide.md:
##
@@ -64,6 +64,7 @@ license: |
 ## Upgrading from Spark SQL 3.5.1 to 3.5.2
 
 - Since 3.5.2, MySQL JDBC datasource will read TINYINT UNSIGNED as ShortType, 
while in 3.5.1, it was wrongly read as ByteType.
+- Since 3.5.2, the `base64` function will return a non-chunked string. To 
restore the previous behavior where base64 encoded strings that chunked into 
lines of at most 76 characters, set 
`spark.sql.legacy.chunkBase64String.enabled` to `true`.

Review Comment:
   Makes sense, I made a little change, thank you.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48388][SQL] Fix SET statement behavior for SQL Scripts [spark]

2024-07-16 Thread via GitHub


davidm-db commented on code in PR #47272:
URL: https://github.com/apache/spark/pull/47272#discussion_r1679605344


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -61,11 +61,18 @@ compoundBody
 
 compoundStatement
 : statement
+| compoundSetStatement
 | beginEndCompoundBlock
 ;
 
+compoundSetStatement

Review Comment:
   To differentiate between the rule used in regular SQL statements 
(`setResetStatement` that contains statements to set config values, but 
contains `setVariable` as well) and the rule used in SQL script compounds (that 
contain only version of variable set to be used in the scripts).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48388][SQL] Fix SET statement behavior for SQL Scripts [spark]

2024-07-16 Thread via GitHub


davidm-db commented on code in PR #47272:
URL: https://github.com/apache/spark/pull/47272#discussion_r1679616760


##
sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4:
##
@@ -251,26 +258,29 @@ statement
 | (MSCK)? REPAIR TABLE identifierReference
 (option=(ADD|DROP|SYNC) PARTITIONS)?   
#repairTable
 | op=(ADD | LIST) identifier .*?   
#manageResource
-| SET COLLATION collationName=identifier   
#setCollation
-| SET ROLE .*? 
#failNativeCommand
+| CREATE INDEX (IF errorCapturingNot EXISTS)? identifier ON TABLE?
+identifierReference (USING indexType=identifier)?
+LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN
+(OPTIONS options=propertyList)?
#createIndex
+| DROP INDEX (IF EXISTS)? identifier ON TABLE? identifierReference 
#dropIndex
+| unsupportedHiveNativeCommands .*?
#failNativeCommand
+;
+
+setResetStatement
+: SET COLLATION collationName=identifier   
#setCollation
+| SET ROLE .*? 
#failSetRole
 | SET TIME ZONE interval   
#setTimeZone
 | SET TIME ZONE timezone   
#setTimeZone
 | SET TIME ZONE .*?
#setTimeZone
 | SET (VARIABLE | VAR) assignmentList  
#setVariable
 | SET (VARIABLE | VAR) LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ
-  LEFT_PAREN query RIGHT_PAREN 
#setVariable
+LEFT_PAREN query RIGHT_PAREN   
#setVariable

Review Comment:
   Reasons for "duplication" (quotes because it's not exactly the same):
   - In standalone SQL statements, VAR or VARIABLE is mandatory
   - In SQL scripts, VAR or VARIABLE is not mandatory
   - In SQL scripts, we still want to allow VAR or VARIABLE to be "compatible" 
with standalone SQL statements
   
   This means that we can either:
   - Have one parser rule for set variable statements and reuse it, but handle 
this logic (and potential syntax exceptions) in the visitor functions
   - Have this handled in the grammar (one rule has ? where VAR or VARIABLE is 
not mandatory, the other doesn't)
   
   We decided on the second approach, but can change to the first one if you 
think so.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


sahnib commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1679584589


##
python/pyspark/worker.py:
##
@@ -1609,6 +1645,35 @@ def mapper(a):
 vals = [a[o] for o in parsed_offsets[0][1]]
 return f(keys, vals)
 
+elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE:
+from itertools import tee
+
+# We assume there is only one UDF here because grouped map doesn't
+# support combining multiple UDFs.
+assert num_udfs == 1
+
+# See FlatMapGroupsInPandasExec for how arg_offsets are used to

Review Comment:
   [nit] See TransformWithStateInPandasExec. 



##
python/pyspark/sql/streaming/stateful_processor.py:
##
@@ -0,0 +1,101 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABC, abstractmethod
+from typing import Any, TYPE_CHECKING, Iterator, Union
+
+from pyspark.sql.streaming.state_api_client import StateApiClient
+from pyspark.sql.streaming.value_state_client import ValueStateClient
+
+import pandas as pd
+from pyspark.sql.types import (
+StructType, StructField, IntegerType, LongType, ShortType,
+FloatType, DoubleType, DecimalType, StringType, BooleanType,
+DateType, TimestampType
+)
+
+if TYPE_CHECKING:
+from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
+
+
+class ValueState:

Review Comment:
   We would need docStrings here as users would refer to these classes to 
understand how different State variables work. 



##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalTimeModes.scala:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.streaming
+
+import java.util.Locale
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.streaming.TimeMode
+
+/**
+ * Internal helper class to generate objects representing various `TimeMode`s,
+ */
+private[sql] object InternalTimeModes {

Review Comment:
   this is mainly for parsing from a String to TimeMode? If so, should we add 
this in the `TimeMode.scala` file? 



##
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import shutil
+import string
+import sys
+import tempfile
+import pandas as pd
+from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
+from typing import Iterator
+
+import unittest
+from typing import cast
+
+from pyspark import SparkConf
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+from pyspark.sql.types import (
+LongTyp

Re: [PR] [SPARK-48510] Fix for UDAF `toColumn` API when running tests in Maven [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #47368:
URL: https://github.com/apache/spark/pull/47368#issuecomment-223129

   Thank you, @xupefei . cc @HyukjinKwon 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #47347:
URL: https://github.com/apache/spark/pull/47347#issuecomment-2231293170

   Thank you, @HyukjinKwon and all.
   Merged to master for Apache Spark 4.0.0-preview2.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun closed pull request #47347: [SPARK-48896][ML][MLLIB] Avoid 
repartition when writing out the metadata
URL: https://github.com/apache/spark/pull/47347


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48909][ML][MLLIB] Uses SparkSession over SparkContext when writing metadata [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun closed pull request #47366: [SPARK-48909][ML][MLLIB] Uses 
SparkSession over SparkContext when writing metadata
URL: https://github.com/apache/spark/pull/47366


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP][SPARK-48911] Improve collation support testing for various expressions [spark]

2024-07-16 Thread via GitHub


uros-db opened a new pull request, #47372:
URL: https://github.com/apache/spark/pull/47372

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


ericm-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1679749407


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1116,3 +1121,88 @@ def init_stream_yield_batches(batches):
 batches_to_write = init_stream_yield_batches(serialize_batches())
 
 return ArrowStreamSerializer.dump_stream(self, batches_to_write, 
stream)
+
+
+class TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
+"""
+Serializer used by Python worker to evaluate UDF for 
transformWithStateInPandasSerializer.
+
+Parameters
+--
+timezone : str
+A timezone to respect when handling timestamp values
+safecheck : bool
+If True, conversion from Arrow to Pandas checks for overflow/truncation
+assign_cols_by_name : bool
+If True, then Pandas DataFrames will get columns by name
+arrow_max_records_per_batch : int
+Limit of the number of records that can be written to a single 
ArrowRecordBatch in memory.
+"""
+
+def __init__(
+self,
+timezone,
+safecheck,
+assign_cols_by_name,
+arrow_max_records_per_batch):
+super(
+TransformWithStateInPandasSerializer,
+self
+).__init__(timezone, safecheck, assign_cols_by_name)
+
+# self.state_server_port = state_server_port
+
+# # open client connection to state server socket
+self.arrow_max_records_per_batch = arrow_max_records_per_batch
+self.key_offsets = None
+
+# Nothing special here, we need to create the handle and read
+# data in groups.
+def load_stream(self, stream):
+"""
+Read ArrowRecordBatches from stream, deserialize them to populate a 
list of pair
+(data chunk, state), and convert the data into a list of pandas.Series.
+
+Please refer the doc of inner function `gen_data_and_state` for more 
details how
+this function works in overall.
+
+In addition, this function further groups the return of 
`gen_data_and_state` by the state
+instance (same semantic as grouping by grouping key) and produces an 
iterator of data
+chunks for each group, so that the caller can lazily materialize the 
data chunk.
+"""
+import pyarrow as pa
+from itertools import tee
+
+def generate_data_batches(batches):
+for batch in batches:
+data_pandas = [self.arrow_to_pandas(c) for c in 
pa.Table.from_batches([batch]).itercolumns()]

Review Comment:
   Not sure if this is a common pattern in Python, but this line is a little 
hard to read



##
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import shutil
+import string
+import sys
+import tempfile
+import pandas as pd
+from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
+from typing import Iterator
+
+import unittest
+from typing import cast
+
+from pyspark import SparkConf
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+from pyspark.sql.types import (
+LongType,
+StringType,
+StructType,
+StructField,
+Row,
+)
+from pyspark.testing.sqlutils import (
+ReusedSQLTestCase,
+have_pandas,
+have_pyarrow,
+pandas_requirement_message,
+pyarrow_requirement_message,
+)
+from pyspark.testing.utils import eventually
+
+
+@unittest.skipIf(
+not have_pandas or not have_pyarrow,
+cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+class TransformWithStateInPandasTestsMixin:
+@classmethod
+def conf(cls):
+cfg = SparkConf()
+cfg.set("spark.sql.shuffle.partitions", "5")
+cfg.set("spark.sql.streaming.stateStore.providerClass",
+
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
+return cfg
+
+def _test_apply_in_pandas_with_state_basic(self, func, check_results):
+input_path = tempfile.mkdtemp()
+
+def prepare_test_resource():
+with open(input_path + "/text-test.txt", "

Re: [PR] [SPARK-48382]Add controller / reconciler module to operator [spark-kubernetes-operator]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #12:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/12#issuecomment-2231421900

   Thank you. Did you finish the updates, @jiangzho ? It seems that there are 
some un-addressed comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48900] Add `reason` field for `cancelJobGroup` and `cancelJobsWithTag` [spark]

2024-07-16 Thread via GitHub


mingkangli-db commented on code in PR #47361:
URL: https://github.com/apache/spark/pull/47361#discussion_r1679822519


##
core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala:
##
@@ -65,10 +65,13 @@ private[scheduler] case class JobCancelled(
 
 private[scheduler] case class JobGroupCancelled(
 groupId: String,
+reason: Option[String],

Review Comment:
   Thank you for the suggestions, just addressed all the comments above.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48382]Add controller / reconciler module to operator [spark-kubernetes-operator]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #12:
URL: 
https://github.com/apache/spark-kubernetes-operator/pull/12#issuecomment-2231493319

   - I reviewed the previous comments and resolved when it's addressed. So, 
please go through the remaining open comments. We need to address them.
   - BTW, please re-consider to split this PR once more into architecturally 
smaller and independent ones.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] add possibility to set log filename & disable spark log rotation [spark]

2024-07-16 Thread via GitHub


Tocard opened a new pull request, #47373:
URL: https://github.com/apache/spark/pull/47373

   As spark cluster administration I want to manage logs of application by my 
own.
   
   Spark-deamon.sh has two main issues with it.
   
   - mandatory log file rotation and no way to disbale it
   - hardcoded log file name without any to way to overide it.
   
   This PR allow without changing the default behavior to customize startup log 
filename & disable log rotation.
   
   I tested it on 3.5 on our own cluster with worker, master, thrift & 
server-history.
   
   
   
   The only changed behavior changed here is SPARK_LOG_MAX_FILES defaulted to 5 
if bad input instaed or fail.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #46873:
URL: https://github.com/apache/spark/pull/46873#issuecomment-2231557976

   Thank you so much, @LuciferYang .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on code in PR #43503:
URL: https://github.com/apache/spark/pull/43503#discussion_r1679886747


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##
@@ -16,164 +16,201 @@
  */
 package org.apache.spark.sql.catalyst.xml
 
+import java.io.Writer
 import java.sql.Timestamp
-import javax.xml.stream.XMLStreamWriter
+import javax.xml.stream.XMLOutputFactory
 
 import scala.collection.Map
 
+import com.sun.xml.txw2.output.IndentingXMLStreamWriter

Review Comment:
   BTW, from JDK community side, this seems to be the unresolved issue.
   - https://bugs.openjdk.org/browse/JDK-8141405 `Public API equivalent to 
com.sun.xml.txw2.output.IndentingXMLStreamWriter`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on code in PR #43503:
URL: https://github.com/apache/spark/pull/43503#discussion_r1679910072


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala:
##
@@ -16,164 +16,201 @@
  */
 package org.apache.spark.sql.catalyst.xml
 
+import java.io.Writer
 import java.sql.Timestamp
-import javax.xml.stream.XMLStreamWriter
+import javax.xml.stream.XMLOutputFactory
 
 import scala.collection.Map
 
+import com.sun.xml.txw2.output.IndentingXMLStreamWriter

Review Comment:
   I filed this. Please use this JIRA ID, @sandip-db .
   
   - SPARK-48913 Avoid `com.sun.xml.txw2.output.IndentingXMLStreamWriter` usage



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-36680][SQL][FOLLOWUP] Files with options should be put into resolveDataSource function [spark]

2024-07-16 Thread via GitHub


szehon-ho commented on code in PR #47370:
URL: https://github.com/apache/spark/pull/47370#discussion_r1679926575


##
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala:
##
@@ -50,7 +52,11 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends 
Rule[LogicalPlan] {
 
   private def resolveDataSource(unresolved: UnresolvedRelation): DataSource = {
 val ident = unresolved.multipartIdentifier
-val dataSource = DataSource(sparkSession, paths = Seq(ident.last), 
className = ident.head)
+val dataSource = DataSource(
+  sparkSession,
+  paths = Seq(ident.last),
+  className = ident.head,
+  options = unresolved.options.asCaseSensitiveMap.asScala.toMap)

Review Comment:
   I think we can remove `.asCaseSensitiveMap` 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [WIP] [SPARK-48900] Add `reason` field for all internal calls for job/stage cancellation [spark]

2024-07-16 Thread via GitHub


mingkangli-db opened a new pull request, #47374:
URL: https://github.com/apache/spark/pull/47374

   
   
   ### What changes were proposed in this pull request?
   
   
   
   ### Why are the changes needed?
   
   
   
   ### Does this PR introduce _any_ user-facing change?
   
   
   
   ### How was this patch tested?
   
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] Corrected row index usage when exploding packed arrays in vectorized reader [spark]

2024-07-16 Thread via GitHub


djspiewak commented on PR #46928:
URL: https://github.com/apache/spark/pull/46928#issuecomment-2231694299

   Is this being held up by anything? Any JIRA would be a fairly trivial 
transliteration of the test case that I added. Note the query and the example 
parquet file. That example does not work today on production Databricks 
instances (resulting an `ArrayIndexOutOfBoundsException`).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48628][CORE] Add task peak on/off heap memory metrics [spark]

2024-07-16 Thread via GitHub


liuzqt commented on code in PR #47192:
URL: https://github.com/apache/spark/pull/47192#discussion_r1679983248


##
core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java:
##
@@ -202,6 +226,18 @@ public long acquireExecutionMemory(long required, 
MemoryConsumer requestingConsu
 logger.debug("Task {} acquired {} for {}", taskAttemptId, 
Utils.bytesToString(got),
   requestingConsumer);
   }
+
+  if (mode == MemoryMode.OFF_HEAP) {
+synchronized (offHeapMemoryLock) {

Review Comment:
   `acquireExecutionMemory` is synchronized but `releaseExecutionMemory` is not 
synchronized.
   
   While we maintain current memory in both places, we can either
   - `synchronized (this)` on `releaseExecutionMemory`
   - or add another lock, for smaller lock granularity



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48791][CORE][3.4] Fix perf regression caused by the accumulators registration overhead using CopyOnWriteArrayList [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #47348:
URL: https://github.com/apache/spark/pull/47348#issuecomment-2231724205

   This PR seems to be ready, but let's wait until we merge #47297 because we 
need to keep the backporting order `master` -> `branch-3.5` -> `branch-3.4` to 
prevent any future regression.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48791][CORE][3.5] Fix perf regression caused by the accumulators registration overhead using CopyOnWriteArrayList [spark]

2024-07-16 Thread via GitHub


dongjoon-hyun commented on PR #47297:
URL: https://github.com/apache/spark/pull/47297#issuecomment-2231726254

   Given that `branch-3.4` PR works, could you re-trigger CI, @Ngone51 ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1679998994


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -358,6 +362,141 @@ def applyInPandasWithState(
 )
 return DataFrame(jdf, self.session)
 
+
+def transformWithStateInPandas(self, 

Review Comment:
   @bogao007 @sahnib - is it possible to just use `transformWithState` here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680002466


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -358,6 +362,141 @@ def applyInPandasWithState(
 )
 return DataFrame(jdf, self.session)
 
+
+def transformWithStateInPandas(self, 
+stateful_processor: StatefulProcessor,
+outputStructType: Union[StructType, str],
+outputMode: str,
+timeMode: str) -> DataFrame:
+"""
+Invokes methods defined in the stateful processor used in arbitrary 
state API v2.
+We allow the user to act on per-group set of input rows along with 
keyed state and the
+user can choose to output/return 0 or more rows.
+
+For a streaming dataframe, we will repeatedly invoke the interface 
methods for new rows
+in each trigger and the user's state/state variables will be stored 
persistently across
+invocations.
+
+The `stateful_processor` should be a Python class that implements the 
interface defined in
+pyspark.sql.streaming.stateful_processor. The stateful processor 
consists 3 functions:
+`init`, `handleInputRows`, and `close`.

Review Comment:
   we also optionally have `handleInitialState`



##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -33,6 +36,7 @@
 PandasCogroupedMapFunction,
 ArrowGroupedMapFunction,
 ArrowCogroupedMapFunction,
+DataFrameLike as PandasDataFrameLike

Review Comment:
   why do we need to add this ?



##
python/pyspark/sql/streaming/state_api_client.py:
##
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from enum import Enum
+import os
+import socket
+from typing import Any, Union, cast
+
+import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+from pyspark.serializers import write_int, read_int, UTF8Deserializer
+from pyspark.sql.types import StructType, _parse_datatype_string
+
+
+class StatefulProcessorHandleState(Enum):
+CREATED = 1
+INITIALIZED = 2
+DATA_PROCESSED = 3
+CLOSED = 4
+
+

Review Comment:
   is this a pattern for PySpark in general ?



##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -358,6 +362,55 @@ def applyInPandasWithState(
 )
 return DataFrame(jdf, self.session)
 
+
+def transformWithStateInPandas(self, 
+stateful_processor: StatefulProcessor,
+outputStructType: Union[StructType, str],
+outputMode: str,
+timeMode: str) -> DataFrame:
+
+from pyspark.sql import GroupedData
+from pyspark.sql.functions import pandas_udf
+assert isinstance(self, GroupedData)
+
+def transformWithStateUDF(state_api_client: StateApiClient, key: Any,
+  inputRows: Iterator["PandasDataFrameLike"]) 
-> Iterator["PandasDataFrameLike"]:
+handle = StatefulProcessorHandle(state_api_client)
+
+print(f"checking handle state: {state_api_client.handle_state}")

Review Comment:
   @bogao007 - should we remove these now ?



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1116,3 +1121,88 @@ def init_stream_yield_batches(batches):
 batches_to_write = init_stream_yield_batches(serialize_batches())
 
 return ArrowStreamSerializer.dump_stream(self, batches_to_write, 
stream)
+
+
+class TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer):
+"""
+Serializer used by Python worker to evaluate UDF for 
transformWithStateInPandasSerializer.
+
+Parameters
+--
+timezone : str
+A timezone to respect when handling timestamp values
+safecheck : bool
+If True, conversion from Arrow to Pandas checks for overflow/truncation
+assign_cols_by_name : bool
+If True, then Pandas DataFrames will get columns by name
+arrow_max_records_per_batch : int
+Limit of the number of records that can be written to a single 
ArrowRecordBatch in memory.
+"""
+
+def __init__(
+self,
+timezone,
+safecheck,
+assign_cols_by_name,
+a

Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680012217


##
python/pyspark/sql/streaming/StateMessage_pb2.pyi:
##
@@ -0,0 +1,139 @@
+from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper

Review Comment:
   @bogao007 @sahnib - i have a general question. how do we choose between 
Protobufv2 and Protobufv3 ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680012933


##
sql/api/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalTimeModes.scala:
##
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.streaming
+
+import java.util.Locale
+
+import org.apache.spark.SparkIllegalArgumentException
+import org.apache.spark.sql.streaming.TimeMode
+
+/**
+ * Internal helper class to generate objects representing various `TimeMode`s,
+ */
+private[sql] object InternalTimeModes {

Review Comment:
   yea +1 - we don't need to duplicate this. I think we can just add as part of 
the SQL API



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680013451


##
python/pyspark/worker.py:
##
@@ -487,6 +489,20 @@ def wrapped(key_series, value_series):
 
 return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))]
 
+def wrap_grouped_transform_with_state_pandas_udf(f, return_type, runner_conf):
+_assign_cols_by_name = assign_cols_by_name(runner_conf)
+
+def wrapped(state_api_client, key, value_series_gen):
+import pandas as pd
+values = (pd.concat(x, axis=1) for x in value_series_gen)
+result_iter = f(state_api_client, key, values)
+
+# TODO: add verification that elements in result_iter are

Review Comment:
   Do other operators do this too ?



##
python/pyspark/util.py:
##
@@ -585,6 +586,7 @@ class PythonEvalType:
 SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: "PandasGroupedMapUDFWithStateType" 
= 208
 SQL_GROUPED_MAP_ARROW_UDF: "ArrowGroupedMapUDFType" = 209
 SQL_COGROUPED_MAP_ARROW_UDF: "ArrowCogroupedMapUDFType" = 210
+SQL_TRANSFORM_WITH_STATE: "PandasGroupedMapUDFTransformWithStateType" = 211

Review Comment:
   nit: `SQL_TRANSFORM_WITH_STATE_UDF` ?



##
python/pyspark/sql/streaming/state_api_client.py:
##
@@ -0,0 +1,142 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from enum import Enum
+import os
+import socket
+from typing import Any, Union, cast
+
+import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+from pyspark.serializers import write_int, read_int, UTF8Deserializer
+from pyspark.sql.types import StructType, _parse_datatype_string
+
+
+class StatefulProcessorHandleState(Enum):
+CREATED = 1
+INITIALIZED = 2
+DATA_PROCESSED = 3
+CLOSED = 4
+
+
+class StateApiClient:
+def __init__(
+self,
+state_server_id: int) -> None:
+self._client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+server_address = f'./uds_{state_server_id}.sock'
+self._client_socket.connect(server_address)
+self.sockfile = self._client_socket.makefile("rwb",
+ 
int(os.environ.get("SPARK_BUFFER_SIZE",65536)))
+print(f"client is ready - connection established")
+self.handle_state = StatefulProcessorHandleState.CREATED
+self.utf8_deserializer = UTF8Deserializer()
+
+def set_handle_state(self, state: StatefulProcessorHandleState) -> None:
+print(f"setting handle state to: {state}")
+proto_state = self._get_proto_state(state)
+set_handle_state = stateMessage.SetHandleState(state=proto_state)
+handle_call = 
stateMessage.StatefulProcessorCall(setHandleState=set_handle_state)
+message = stateMessage.StateRequest(statefulProcessorCall=handle_call)
+
+self._send_proto_message(message)
+
+response_message = self._receive_proto_message()
+status = response_message.statusCode
+if (status == 0):
+self.handle_state = state
+else:
+raise Exception(f"Error setting handle state: 
{response_message.errorMessage}")
+print(f"setHandleState status= {status}")
+
+def set_implicit_key(self, key: str) -> None:
+print(f"setting implicit key: {key}")
+set_implicit_key = stateMessage.SetImplicitKey(key=key)
+request = 
stateMessage.ImplicitGroupingKeyRequest(setImplicitKey=set_implicit_key)
+message = stateMessage.StateRequest(implicitGroupingKeyRequest=request)
+
+self._send_proto_message(message)
+response_message = self._receive_proto_message()
+status = response_message.statusCode
+print(f"setImplicitKey status= {status}")
+if (status != 0):
+raise Exception(f"Error setting implicit key: 
{response_message.errorMessage}")
+
+def remove_implicit_key(self) -> None:
+print(f"removing implicit key")
+remove_implicit_key = stateMessage.RemoveImplicitKey()
+request = 
stateMessage.ImplicitGroupingKeyRequest(removeImplicitKey=remove_implicit_key)
+message = stateMessage.StateRequest(implicitGroupingKeyRequest=request)
+
+self._send_p

Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on PR #47133:
URL: https://github.com/apache/spark/pull/47133#issuecomment-2231760411

   @bogao007 - this PR is doing a lot. Could we please atleast a high level 
description of what files are being added, how they are being used and what 
features will work after this change is merged ? Thx


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48700] [SQL] Mode expression for complex types (all collations) [spark]

2024-07-16 Thread via GitHub


GideonPotok commented on code in PR #47154:
URL: https://github.com/apache/spark/pull/47154#discussion_r1680023732


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -86,6 +71,78 @@ case class Mode(
 buffer
   }
 
+  private def recursivelyGetBufferForArrayType(
+  a: ArrayType,
+  data: ArrayData): Seq[Any] = {
+(0 until data.numElements()).map { i =>
+  data.get(i, a.elementType) match {
+case k: UTF8String if a.elementType.isInstanceOf[StringType] &&
+  !a.elementType.asInstanceOf[StringType].supportsBinaryEquality
+  => CollationFactory.getCollationKey(k, 
a.elementType.asInstanceOf[StringType].collationId)

Review Comment:
   Done
   



##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala:
##
@@ -86,6 +71,78 @@ case class Mode(
 buffer
   }
 
+  private def recursivelyGetBufferForArrayType(
+  a: ArrayType,
+  data: ArrayData): Seq[Any] = {
+(0 until data.numElements()).map { i =>
+  data.get(i, a.elementType) match {
+case k: UTF8String if a.elementType.isInstanceOf[StringType] &&
+  !a.elementType.asInstanceOf[StringType].supportsBinaryEquality
+  => CollationFactory.getCollationKey(k, 
a.elementType.asInstanceOf[StringType].collationId)
+case k if a.elementType.isInstanceOf[StructType] =>
+  recursivelyGetBufferForStructType(
+
k.asInstanceOf[InternalRow].toSeq(a.elementType.asInstanceOf[StructType]).zip(
+  a.elementType.asInstanceOf[StructType].fields))
+case k if a.elementType.isInstanceOf[ArrayType] =>
+  recursivelyGetBufferForArrayType(
+a.elementType.asInstanceOf[ArrayType],
+k.asInstanceOf[ArrayData])
+case k => k
+  }
+}
+  }
+
+  private def getBufferForComplexType(
+  buffer: OpenHashMap[AnyRef, Long],
+  d: DataType): Iterable[(AnyRef, Long)] = {
+   buffer.groupMapReduce {
+  case (key: InternalRow, _) if d.isInstanceOf[StructType] =>
+recursivelyGetBufferForStructType(key.toSeq(d.asInstanceOf[StructType])
+  .zip(d.asInstanceOf[StructType].fields))
+  case (key: ArrayData, _) if d.isInstanceOf[ArrayType] =>
+recursivelyGetBufferForArrayType(d.asInstanceOf[ArrayType], key)
+}(x => x)((x, y) => (x._1, x._2 + y._2)).values
+  }
+
+  private def isSpecificStringTypeMatch(field: StructField, fieldName: 
String): Boolean =
+field.dataType.isInstanceOf[StringType] &&
+  !field.dataType.asInstanceOf[StringType].supportsBinaryEquality &&

Review Comment:
   Done
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


bogao007 commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680030806


##
python/pyspark/sql/streaming/StateMessage_pb2.pyi:
##
@@ -0,0 +1,139 @@
+from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper

Review Comment:
   Spark Connect was using proto3 when introducing new proto messages, I was 
trying to follow the same. I was not able to find a guide in OSS Spark on which 
version is recommended though. cc @HyukjinKwon 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680017992


##
python/pyspark/sql/streaming/__init__.py:
##
@@ -19,3 +19,4 @@
 from pyspark.sql.streaming.readwriter import DataStreamReader, 
DataStreamWriter  # noqa: F401
 from pyspark.sql.streaming.listener import StreamingQueryListener  # noqa: F401
 from pyspark.errors import StreamingQueryException  # noqa: F401
+from pyspark.sql.streaming.stateful_processor import StatefulProcessor, 
StatefulProcessorHandle # noqa: F401

Review Comment:
   Where is this change used ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[PR] [SPARK-48914][SQL] Add OFFSET operator as an option in the subquery generator [spark]

2024-07-16 Thread via GitHub


averyqi-db opened a new pull request, #47375:
URL: https://github.com/apache/spark/pull/47375

   
   
   ### What changes were proposed in this pull request?
   This adds offset operator in subquery generator suite.
   
   
   ### Why are the changes needed?
   Complete the subquery generator functionality
   
   
   ### Does this PR introduce _any_ user-facing change?
   previously there's no subqueries having offset operator being tested. 
Currently offset operator is added.
   
   
   ### How was this patch tested?
   query test
   
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680039967


##
python/pyspark/sql/streaming/StateMessage_pb2.pyi:
##
@@ -0,0 +1,139 @@
+from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper

Review Comment:
   Oh ok - the generated name was `pb2` - so I thought its using proto2



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680041729


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala:
##
@@ -201,7 +202,7 @@ object ExpressionEncoder {
* object.  Thus, the caller should copy the result before making another 
call if required.
*/
   class Serializer[T](private val expressions: Seq[Expression])
-extends (T => InternalRow) with Serializable {
+extends (T => InternalRow) with Serializable with Logging {

Review Comment:
   yea - can we remove this ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680042299


##
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala:
##
@@ -161,6 +161,41 @@ case class FlatMapGroupsInPandasWithState(
 newChild: LogicalPlan): FlatMapGroupsInPandasWithState = copy(child = 
newChild)
 }
 
+object TransformWithStateInPandas {

Review Comment:
   could we say `TransformWithStateInPython` instead ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680043141


##
sql/core/pom.xml:
##
@@ -240,11 +241,42 @@
   bcpkix-jdk18on
   test
 
+
+  com.github.jnr

Review Comment:
   Could we do this dependency upgrade as a separate change maybe ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


bogao007 commented on PR #47133:
URL: https://github.com/apache/spark/pull/47133#issuecomment-2231795368

   > @bogao007 - this PR is doing a lot. Could we please atleast a high level 
description of what files are being added, how they are being used and what 
features will work after this change is merged ? Thx
   
   Sure, will do.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


bogao007 commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680045141


##
sql/core/pom.xml:
##
@@ -240,11 +241,42 @@
   bcpkix-jdk18on
   test
 
+
+  com.github.jnr

Review Comment:
   This is needed for unix domain socket, I can make the UDS related changes to 
a separate PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680045887


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream, EOFException}
+import java.nio.channels.Channels
+
+import scala.collection.mutable
+
+import com.google.protobuf.ByteString
+import jnr.unixsocket.UnixServerSocketChannel
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Encoder, Encoders, Row}
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import 
org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, 
ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateResponse, 
StateVariableRequest, ValueStateCall}
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, 
FloatType, IntegerType, LongType, StructType}
+
+/**
+ * This class is used to handle the state requests from the Python side.

Review Comment:
   Could we add some more comments here ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680046203


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream, EOFException}
+import java.nio.channels.Channels
+
+import scala.collection.mutable
+
+import com.google.protobuf.ByteString
+import jnr.unixsocket.UnixServerSocketChannel
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Encoder, Encoders, Row}
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import 
org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, 
ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateResponse, 
StateVariableRequest, ValueStateCall}
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, 
FloatType, IntegerType, LongType, StructType}
+
+/**
+ * This class is used to handle the state requests from the Python side.
+ */
+class TransformWithStateInPandasStateServer(
+private val serverChannel: UnixServerSocketChannel,
+private val statefulProcessorHandle: StatefulProcessorHandleImpl,
+private val groupingKeySchema: StructType)
+  extends Runnable
+  with Logging{

Review Comment:
   nit: indent and also space required after `Logging` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [WIP][SPARK-42204][CORE] Add option to disable redundant logging of TaskMetrics internal accumulators in event logs [spark]

2024-07-16 Thread via GitHub


rednaxelafx commented on PR #39763:
URL: https://github.com/apache/spark/pull/39763#issuecomment-2231799756

   This looks good to me as well (non-binding). Thanks a lot for reviving this 
improvement!
   Agreed that the historical Spark UI itself wouldn't be affected. The REST 
API will get some exposure to the change though. Hopefully there's no client 
out in the wild that relies on this redundant information.
   
   The `JsonProtocolOptions` part looks pragmatic and I'm okay with that. If 
people are really annoyed by that, there's always the path of creating a 
`JsonProtocol` class that wraps the `JsonProtocolOptions` and delegates all 
operations down to the existing companion object. I don't think we have to do 
there here.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680046810


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream, EOFException}
+import java.nio.channels.Channels
+
+import scala.collection.mutable
+
+import com.google.protobuf.ByteString
+import jnr.unixsocket.UnixServerSocketChannel
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Encoder, Encoders, Row}
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import 
org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, 
ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateResponse, 
StateVariableRequest, ValueStateCall}
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, 
FloatType, IntegerType, LongType, StructType}
+
+/**
+ * This class is used to handle the state requests from the Python side.
+ */
+class TransformWithStateInPandasStateServer(
+private val serverChannel: UnixServerSocketChannel,
+private val statefulProcessorHandle: StatefulProcessorHandleImpl,
+private val groupingKeySchema: StructType)
+  extends Runnable
+  with Logging{
+
+  private var inputStream: DataInputStream = _
+  private var outputStream: DataOutputStream = _
+
+  private val valueStates = mutable.HashMap[String, ValueState[Any]]()
+
+  def run(): Unit = {
+logWarning(s"Waiting for connection from Python worker")

Review Comment:
   nit: do we plan to keep this ? if so - can we add more logs to identify the 
worker process ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680047393


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala:
##
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{DataInputStream, DataOutputStream, EOFException}
+import java.nio.channels.Channels
+
+import scala.collection.mutable
+
+import com.google.protobuf.ByteString
+import jnr.unixsocket.UnixServerSocketChannel
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Encoder, Encoders, Row}
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import 
org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, 
ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateResponse, 
StateVariableRequest, ValueStateCall}
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, 
FloatType, IntegerType, LongType, StructType}
+
+/**
+ * This class is used to handle the state requests from the Python side.
+ */
+class TransformWithStateInPandasStateServer(
+private val serverChannel: UnixServerSocketChannel,
+private val statefulProcessorHandle: StatefulProcessorHandleImpl,
+private val groupingKeySchema: StructType)
+  extends Runnable
+  with Logging{
+
+  private var inputStream: DataInputStream = _
+  private var outputStream: DataOutputStream = _
+
+  private val valueStates = mutable.HashMap[String, ValueState[Any]]()
+
+  def run(): Unit = {
+logWarning(s"Waiting for connection from Python worker")
+val channel = serverChannel.accept()
+logWarning(s"listening on channel - ${channel.getLocalAddress}")
+
+inputStream = new DataInputStream(
+  Channels.newInputStream(channel))
+outputStream = new DataOutputStream(
+  Channels.newOutputStream(channel)
+)
+
+while (channel.isConnected &&
+  statefulProcessorHandle.getHandleState != 
StatefulProcessorHandleState.CLOSED) {
+
+  try {
+logWarning(s"reading the version")
+val version = inputStream.readInt()
+
+if (version != -1) {
+  logWarning(s"version = ${version}")
+  assert(version == 0)
+  val messageLen = inputStream.readInt()
+  logWarning(s"parsing a message of ${messageLen} bytes")
+
+  val messageBytes = new Array[Byte](messageLen)
+  inputStream.read(messageBytes)
+  logWarning(s"read bytes = ${messageBytes.mkString("Array(", ", ", 
")")}")
+
+  val message = 
StateRequest.parseFrom(ByteString.copyFrom(messageBytes))
+
+  logWarning(s"read message = $message")
+  handleRequest(message)
+  logWarning(s"flush output stream")
+
+  outputStream.flush()
+}
+  } catch {
+case _: EOFException =>
+  logWarning(s"No more data to read from the socket")
+  
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED)
+  return
+case e: Exception =>
+  logWarning(s"Error reading message: ${e.getMessage}")
+  sendResponse(1, e.getMessage)
+  outputStream.flush()
+  
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED)
+  return
+  }
+}
+logWarning(s"done from the state server thread")
+  }
+
+  private def handleRequest(message: StateRequest): Unit = {
+if (message.getMethodCase == 
StateRequest.MethodCase.STATEFULPROCESSORCALL) {
+  val statefulProcessorHandleRequest = message.getStatefulProcessorCall
+  if (statefulProcessorHandleRequest.getMethodCase ==
+StatefulProcessorCall.MethodCase.SETHANDLESTATE) {
+val requestedState = 
statefulProcessorHandleRequest.getSetHandleState.getState
+requestedState match {
+  case HandleState.CREATED =>
+logWarning(s"set handle state to Created")
+
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CREATED)
+  cas

Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680047982


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala:
##
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, 
DataOutputStream}
+import java.net.ServerSocket
+
+import scala.collection.mutable
+import scala.util.control.Breaks.break
+
+import com.google.protobuf.ByteString
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Encoder, Encoders, Row}
+import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, 
StatefulProcessorHandleImpl, StatefulProcessorHandleState}
+import 
org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, 
ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, 
StateVariableRequest, ValueStateCall}
+import org.apache.spark.sql.streaming.ValueState
+import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, 
FloatType, IntegerType, LongType, StringType, StructType}
+
+/**
+ * This class is used to handle the state requests from the Python side.
+ */
+class TransformWithStateInPandasStateServer(
+private val stateServerSocket: ServerSocket,
+private val statefulProcessorHandle: StatefulProcessorHandleImpl,
+private val groupingKeySchema: StructType)
+  extends Runnable
+  with Logging{
+
+  private var inputStream: DataInputStream = _
+  private var outputStream: DataOutputStream = _
+
+  private val valueStates = mutable.HashMap[String, ValueState[String]]()
+
+  def run(): Unit = {
+logWarning(s"Waiting for connection from Python worker")
+val listeningSocket = stateServerSocket.accept()
+logWarning(s"listening on socket - ${listeningSocket.getLocalAddress}")
+
+inputStream = new DataInputStream(
+  new BufferedInputStream(listeningSocket.getInputStream))
+outputStream = new DataOutputStream(
+  new BufferedOutputStream(listeningSocket.getOutputStream)
+)
+
+while (listeningSocket.isConnected &&
+  statefulProcessorHandle.getHandleState != 
StatefulProcessorHandleState.CLOSED) {
+
+  logWarning(s"reading the version")
+  val version = inputStream.readInt()
+
+  if (version != -1) {
+logWarning(s"version = ${version}")
+assert(version == 0)
+val messageLen = inputStream.readInt()
+logWarning(s"parsing a message of ${messageLen} bytes")
+
+val messageBytes = new Array[Byte](messageLen)
+inputStream.read(messageBytes)
+logWarning(s"read bytes = ${messageBytes.mkString("Array(", ", ", 
")")}")
+
+val message = StateRequest.parseFrom(ByteString.copyFrom(messageBytes))
+
+logWarning(s"read message = $message")
+handleRequest(message)
+logWarning(s"flush output stream")
+
+outputStream.flush()
+  }
+}
+
+logWarning(s"done from the state server thread")
+  }
+
+  private def handleRequest(message: StateRequest): Unit = {
+if (message.getMethodCase == 
StateRequest.MethodCase.STATEFULPROCESSORCALL) {
+  val statefulProcessorHandleRequest = message.getStatefulProcessorCall
+  if (statefulProcessorHandleRequest.getMethodCase ==
+StatefulProcessorCall.MethodCase.SETHANDLESTATE) {
+val requestedState = 
statefulProcessorHandleRequest.getSetHandleState.getState
+requestedState match {
+  case HandleState.CREATED =>
+logWarning(s"set handle state to Created")
+
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CREATED)
+  case HandleState.INITIALIZED =>
+logWarning(s"set handle state to Initialized")
+
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED)
+  case HandleState.CLOSED =>
+logWarning(s"set handle state to Closed")
+
statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED)
+  case _ =>
+}
+outputStream.writeInt(0)
+  } else if (statefulPr

Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680048469


##
sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasWriter.scala:
##
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.python
+
+import org.apache.arrow.vector.VectorSchemaRoot
+import org.apache.arrow.vector.ipc.ArrowStreamWriter
+
+class TransformWithStateInPandasWriter(

Review Comment:
   nit: where is this used ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680048865


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala:
##
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import org.apache.spark.internal.Logging

Review Comment:
   Could we remove changes in this file entirely ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48903][SS] Set the RocksDB last snapshot version correctly on remote load [spark]

2024-07-16 Thread via GitHub


HeartSaVioR commented on code in PR #47363:
URL: https://github.com/apache/spark/pull/47363#discussion_r1680048571


##
sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala:
##
@@ -1663,9 +1670,8 @@ class RocksDBSuite extends 
AlsoTestWithChangelogCheckpointingEnabled with Shared
 db.load(0)
 db.put("a", "1")
 
-// upload version 1 snapshot created above
+// do not upload version 1 snapshot created previously
 db.doMaintenance()
-assert(snapshotVersionsPresent(remoteDir) == Seq(1))

Review Comment:
   Shall we explicitly test this as this is a one of major spec of change? We 
should confirm that Nil will be returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680059077


##
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala:
##
@@ -408,6 +408,9 @@ object StateStoreProvider {
   hadoopConf: Configuration,
   useMultipleValuesPerKey: Boolean): StateStoreProvider = {
 val provider = create(storeConf.providerClass)
+// scalastyle:off println
+println(s"provider config is ${storeConf.providerClass}, all config = 
${storeConf.sqlConfs}")

Review Comment:
   intentional ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1680059417


##
sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInPandasSuite.scala:
##
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.sql.streaming.StreamTest
+import org.apache.spark.sql.types.{LongType, StringType, StructField, 
StructType}
+import 
org.apache.spark.sql.IntegratedUDFTestUtils.TestGroupedMapPandasUDFWithState
+import org.apache.spark.sql.catalyst.expressions.PythonUDF
+import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.tags.SlowSQLTest
+
+@SlowSQLTest
+class TransformWithStateInPandasSuite extends StreamTest {
+
+  test("transformWithStateInPandas - streaming") {

Review Comment:
   Could we add some more tests here ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]

2024-07-16 Thread via GitHub


anishshri-db commented on PR #47133:
URL: https://github.com/apache/spark/pull/47133#issuecomment-2231820006

   @bogao007 - test failure seems related ?
   
   ```
   [error] 
/home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:51:12:
 class TransformWithStateInPandasExec needs to be abstract.
   [error] Missing implementation for member of trait StatefulOperator:
   [error]   def validateAndMaybeEvolveStateSchema(hadoopConf: 
org.apache.hadoop.conf.Configuration, batchId: Long, stateSchemaVersion: Int): 
Array[String] = ???
   [error] case class TransformWithStateInPandasExec(
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >