Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]
dongjoon-hyun commented on PR #46873: URL: https://github.com/apache/spark/pull/46873#issuecomment-2230167466 Thank you for pinging me, @LuciferYang . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48890][CORE][SS] Add Structured Streaming related fields to log4j ThreadContext [spark]
WweiL commented on code in PR #47340: URL: https://github.com/apache/spark/pull/47340#discussion_r1678863246 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala: ## @@ -287,6 +290,11 @@ abstract class StreamExecution( sparkSession.sparkContext.setJobGroup(runId.toString, getBatchDescriptionString, interruptOnCancel = true) sparkSession.sparkContext.setLocalProperty(StreamExecution.QUERY_ID_KEY, id.toString) + loggingThreadContext = CloseableThreadContext.putAll( +Map( + StreamExecution.QUERY_ID_KEY -> id.toString, Review Comment: oh yes I was using that, forgot to push update to github! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48752][PYTHON][CONNECT][DOCS] Introduce `pyspark.logger` for improved structured logging for PySpark [spark]
itholic commented on code in PR #47145: URL: https://github.com/apache/spark/pull/47145#discussion_r1678866930 ## python/docs/source/development/logger.rst: ## @@ -0,0 +1,151 @@ +.. Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + +..http://www.apache.org/licenses/LICENSE-2.0 + +.. Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. + +== +Logging in PySpark +== + +.. currentmodule:: pyspark.logger + +Introduction + + +The :ref:`pyspark.logger` module facilitates structured client-side logging for PySpark users. + +This module includes a :class:`PySparkLogger` class that provides several methods for logging messages at different levels in a structured JSON format: + +- :meth:`PySparkLogger.log_info` +- :meth:`PySparkLogger.log_warn` +- :meth:`PySparkLogger.log_error` + +The logger can be easily configured to write logs to either the console or a specified file. + +Customizing Log Format +== +The default log format is JSON, which includes the timestamp, log level, logger name, and the log message along with any additional context provided. + +Example log entry: + +.. code-block:: python + +{ + "ts": "2024-06-28T10:53:48.528Z", + "level": "ERROR", + "logger": "DataFrameQueryContextLogger", + "msg": "[DIVIDE_BY_ZERO] Division by zero.", + "context": { +"file": "/path/to/file.py", Review Comment: @gengliangwang Just updated PR. Now the log will include `exception` field as below when error occurs: ```json { "ts": "2024-06-28 19:53:48,563", "level": "ERROR", "logger": "DataFrameQueryContextLogger", "msg": "[DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/.../spark/python/test_error_context.py:17\n", "context": { "file": "/.../spark/python/test_error_context.py", "line_no": "17", "fragment": "__truediv__" "error_class": "DIVIDE_BY_ZERO" }, "exception": { "class": "Py4JJavaError", "msg": "An error occurred while calling o52.showString.\n: org.apache.spark.SparkArithmeticException: [DIVIDE_BY_ZERO] Division by zero. Use `try_divide` to tolerate divisor being 0 and return NULL instead. If necessary set \"spark.sql.ansi.enabled\" to \"false\" to bypass this error. SQLSTATE: 22012\n== DataFrame ==\n\"__truediv__\" was called from\n/Users/haejoon.lee/Desktop/git_repos/spark/python/test_error_context.py:22\n\n\tat org.apache.spark.sql.errors.QueryExecutionErrors$.divideByZeroError(QueryExecutionErrors.scala:203)\n\tat org.apache.spark.sql.errors.QueryExecutionErrors.divideByZeroError(QueryExecutionErrors.scala)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)\n\tat org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)\n\tat org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)\n \tat org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:50)\n\tat org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:896)\n\tat org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:896)\n\tat org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)\n\tat org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:369)\n\tat org.apache.spark.rdd.RDD.iterator(RDD.scala:333)\n\tat org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)\n\tat org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:171)\n\tat org.apache.spark.scheduler.Task.run(Task.scala:146)\n\tat org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$5(Executor.scala:644)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkE rrorUtils.scala:64)\n\tat org.apache.spark.util.SparkErrorUtils.tryWithSafeFin
Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]
dongjoon-hyun commented on code in PR #43503: URL: https://github.com/apache/spark/pull/43503#discussion_r1678869244 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala: ## @@ -16,164 +16,201 @@ */ package org.apache.spark.sql.catalyst.xml +import java.io.Writer import java.sql.Timestamp -import javax.xml.stream.XMLStreamWriter +import javax.xml.stream.XMLOutputFactory import scala.collection.Map +import com.sun.xml.txw2.output.IndentingXMLStreamWriter Review Comment: Thank you for answering, @HyukjinKwon . Let me file a JIRA in order not to forget this. I guess we need a careful look on this kind of part before Apache Spark 4.0.0 release. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]
dongjoon-hyun commented on code in PR #46873: URL: https://github.com/apache/spark/pull/46873#discussion_r1678871813 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -3072,15 +3072,14 @@ private[spark] object Utils */ lazy val isG1GC: Boolean = { Try { + // SPARK-48505: If the initialization probe of `HotSpotDiagnosticMXBean` is successful, + // the API of `HotSpotDiagnosticMXBean` can be used directly in subsequent operations, + // instead of invoking it through reflection. val clazz = Utils.classForName("com.sun.management.HotSpotDiagnosticMXBean") -.asInstanceOf[Class[_ <: PlatformManagedObject]] - val vmOptionClazz = Utils.classForName("com.sun.management.VMOption") + import com.sun.management.HotSpotDiagnosticMXBean Review Comment: Although the code becomes simpler, I'm not sure this is a better recommended style: `import com.sun.*` in our source code. There is a similar discussion here. - https://github.com/apache/spark/pull/43503#discussion_r1671300283 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]
chenhao-db commented on code in PR #47310: URL: https://github.com/apache/spark/pull/47310#discussion_r1678874248 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -4367,6 +4367,14 @@ object SQLConf { .booleanConf .createWithDefault(true) + val JSON_USE_UNSAFE_ROW = +buildConf("spark.sql.json.useUnsafeRow") + .internal() + .doc("When set to true, use UnsafeRow to represent struct result in the JSON parser.") + .version("4.0.0") + .booleanConf + .createWithDefault(false) Review Comment: I feel it is better to not enable it by default. The benchmark has shown that applying the change can slow down the JSON scan a bit. If there is enough memory, then saving memory doesn't have enough benefit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]
chenhao-db commented on PR #47310: URL: https://github.com/apache/spark/pull/47310#issuecomment-2230185352 @HyukjinKwon thanks! could you help merge it? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]
dongjoon-hyun commented on PR #46873: URL: https://github.com/apache/spark/pull/46873#issuecomment-2230191426 BTW, I saw the first attempt here and the comment. - #46783 IIUC, he said he is okay with the old code. > I'd say I'm okay with the old reflection-based version for its preciseness. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][TESTS] Remove unused test jar (udf_noA.jar) [spark]
dongjoon-hyun commented on PR #47309: URL: https://github.com/apache/spark/pull/47309#issuecomment-2230213227 😄 Ya, this is the 7th instance. - https://github.com/apache/spark/pulls?q=is%3Apr+is%3Aclosed+is%3Amerged Given that the last one was last year, this is an annual event. - #43188 Do you think we need to prevent this kind of mistake by updating our committer docs or changing the following rule? https://github.com/apache/spark/blob/3923d7f8fcafb86fcbffcedf4215a314fe5b4011/.asf.yaml#L30-L33 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47307][SQL][3.5] Add a config to optionally chunk base64 strings [spark]
dongjoon-hyun commented on PR #47325: URL: https://github.com/apache/spark/pull/47325#issuecomment-2230214873 Thank you, @wForget and @yaooqinn ! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]
LuciferYang commented on code in PR #46873: URL: https://github.com/apache/spark/pull/46873#discussion_r1678908450 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -3072,15 +3072,14 @@ private[spark] object Utils */ lazy val isG1GC: Boolean = { Try { + // SPARK-48505: If the initialization probe of `HotSpotDiagnosticMXBean` is successful, + // the API of `HotSpotDiagnosticMXBean` can be used directly in subsequent operations, + // instead of invoking it through reflection. val clazz = Utils.classForName("com.sun.management.HotSpotDiagnosticMXBean") -.asInstanceOf[Class[_ <: PlatformManagedObject]] - val vmOptionClazz = Utils.classForName("com.sun.management.VMOption") + import com.sun.management.HotSpotDiagnosticMXBean Review Comment: Thanks @dongjoon-hyun ,understood, that makes sense. Let's keep things as they are, I will close this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]
LuciferYang commented on code in PR #46873: URL: https://github.com/apache/spark/pull/46873#discussion_r1678908450 ## core/src/main/scala/org/apache/spark/util/Utils.scala: ## @@ -3072,15 +3072,14 @@ private[spark] object Utils */ lazy val isG1GC: Boolean = { Try { + // SPARK-48505: If the initialization probe of `HotSpotDiagnosticMXBean` is successful, + // the API of `HotSpotDiagnosticMXBean` can be used directly in subsequent operations, + // instead of invoking it through reflection. val clazz = Utils.classForName("com.sun.management.HotSpotDiagnosticMXBean") -.asInstanceOf[Class[_ <: PlatformManagedObject]] - val vmOptionClazz = Utils.classForName("com.sun.management.VMOption") + import com.sun.management.HotSpotDiagnosticMXBean Review Comment: Thanks @dongjoon-hyun ,understood, that makes sense. Let's keep things as they are; I will close this pr. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]
LuciferYang closed pull request #46873: [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` URL: https://github.com/apache/spark/pull/46873 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][TESTS] Remove unused test jar (udf_noA.jar) [spark]
grundprinzip commented on PR #47309: URL: https://github.com/apache/spark/pull/47309#issuecomment-2230264319 Conceptually yes, but I wanted to spend some time understanding the delta between the merge button and the script and how GH might have changed in between. One idea would be to use a GH action for the script as well that triggers by a keyword for example so that one does not always need to push directly from a local machine. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [MINOR][TESTS] Remove unused test jar (udf_noA.jar) [spark]
HyukjinKwon commented on PR #47309: URL: https://github.com/apache/spark/pull/47309#issuecomment-2230287146 Yeah I think we should block it merge button -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]
HyukjinKwon commented on code in PR #43503: URL: https://github.com/apache/spark/pull/43503#discussion_r1678954714 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala: ## @@ -16,164 +16,201 @@ */ package org.apache.spark.sql.catalyst.xml +import java.io.Writer import java.sql.Timestamp -import javax.xml.stream.XMLStreamWriter +import javax.xml.stream.XMLOutputFactory import scala.collection.Map +import com.sun.xml.txw2.output.IndentingXMLStreamWriter Review Comment: @sandip-db would you mind taking a look and followup when you find some time? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]
HyukjinKwon commented on PR #47310: URL: https://github.com/apache/spark/pull/47310#issuecomment-2230290146 Merged to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]
HyukjinKwon closed pull request #47310: [SPARK-48873][SQL] Use UnsafeRow in JSON parser. URL: https://github.com/apache/spark/pull/47310 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48873][SQL] Use UnsafeRow in JSON parser. [spark]
LuciferYang commented on PR #47310: URL: https://github.com/apache/spark/pull/47310#issuecomment-2230296253 Sorry, forgot to review this one. Late LGTM -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48865][SQL] Add try_url_decode function [spark]
yaooqinn commented on code in PR #47294: URL: https://github.com/apache/spark/pull/47294#discussion_r1678963389 ## sql/core/src/test/resources/sql-tests/inputs/url-functions.sql: ## @@ -17,4 +17,10 @@ select url_encode(null); select url_decode('https%3A%2F%2Fspark.apache.org'); select url_decode('http%3A%2F%2spark.apache.org'); select url_decode('inva lid://user:pass@host/file\\;param?query\\;p2'); -select url_decode(null); \ No newline at end of file +select url_decode(null); + +-- try_url_decode function +select try_url_decode('https%3A%2F%2Fspark.apache.org'); +select try_url_decode('http%3A%2F%2spark.apache.org'); +select try_url_decode('inva lid://user:pass@host/file\\;param?query\\;p2'); +select try_url_decode(null); Review Comment: ```suggestion select try_url_decode(null); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
pan3793 commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1678971316 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: maybe numDriverCores * 2 or 3? in production practice, we usually expect a larger(2 or 3 times of available exec cores) parallelism of the stage to tackle uneven task runtimes -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48907][SQL] Fix the value `explicitTypes` in `COLLATION_MISMATCH.EXPLICIT` [spark]
panbingkun opened a new pull request, #47365: URL: https://github.com/apache/spark/pull/47365 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48865][SQL] Add try_url_decode function [spark]
yaooqinn commented on PR #47294: URL: https://github.com/apache/spark/pull/47294#issuecomment-2230342454 Also cc @cloud-fan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala [spark]
cloud-fan commented on code in PR #47301: URL: https://github.com/apache/spark/pull/47301#discussion_r1679019564 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala: ## @@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { this } + /** + * Clusters the output by the given columns on the file system. The rows with matching values in Review Comment: let's be a bit more general as data sources are not always based on file system. How about `... given columns on the storage.`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala [spark]
cloud-fan commented on code in PR #47301: URL: https://github.com/apache/spark/pull/47301#discussion_r1679022463 ## connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala: ## @@ -201,6 +201,22 @@ final class DataFrameWriter[T] private[sql] (ds: Dataset[T]) { this } + /** + * Clusters the output by the given columns on the file system. The rows with matching values in + * the specified clustering columns will be consolidated within the same file. Review Comment: ditto, `... will be consolidated within the same group.` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala [spark]
cloud-fan commented on code in PR #47301: URL: https://github.com/apache/spark/pull/47301#discussion_r1679027390 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala: ## @@ -209,10 +209,25 @@ object ClusterBySpec { normalizeClusterBySpec(schema, clusterBySpec, resolver).toJson } + /** + * Converts a ClusterBySpec to a map of table properties used to store the clustering + * information in the table catalog. + * + * @param clusterBySpec : existing ClusterBySpec to be converted to properties. + */ + def toProperties(clusterBySpec: ClusterBySpec): Map[String, String] = { Review Comment: what's the difference between this and `toProperty`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48761][SQL] Introduce clusterBy DataFrameWriter API for Scala [spark]
cloud-fan commented on code in PR #47301: URL: https://github.com/apache/spark/pull/47301#discussion_r1679031615 ## sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriterV2.scala: ## @@ -104,9 +106,27 @@ final class DataFrameWriterV2[T] private[sql](table: String, ds: Dataset[T]) } this.partitioning = Some(asTransforms) +validatePartitioning() +this + } + + @scala.annotation.varargs + override def clusterBy(colName: String, colNames: String*): CreateTableWriter[T] = { +this.clustering = + Some(ClusterByTransform((colName +: colNames).map(col => FieldReference(col +validatePartitioning() this } + /** + * Validate that clusterBy is not used with partitionBy. + */ + private def validatePartitioning(): Unit = { +if (partitioning.nonEmpty && clustering.nonEmpty) { Review Comment: `DataFrameWriterV2` does not have `bucketBy`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48907][SQL] Fix the value `explicitTypes` in `COLLATION_MISMATCH.EXPLICIT` [spark]
panbingkun commented on PR #47365: URL: https://github.com/apache/spark/pull/47365#issuecomment-2230411842 cc @mihailom-db @cloud-fan @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48907][SQL] Fix the value `explicitTypes` in `COLLATION_MISMATCH.EXPLICIT` [spark]
panbingkun commented on code in PR #47365: URL: https://github.com/apache/spark/pull/47365#discussion_r1679059383 ## sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala: ## @@ -3675,7 +3675,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat new AnalysisException( errorClass = "COLLATION_MISMATCH.EXPLICIT", messageParameters = Map( -"explicitTypes" -> toSQLId(explicitTypes) +"explicitTypes" -> explicitTypes.map(toSQLId).mkString(", ") Review Comment: fix it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata [spark]
HyukjinKwon commented on PR #47347: URL: https://github.com/apache/spark/pull/47347#issuecomment-2230473641 Sure, I will separate the PR. Thanks for reviewing this closely 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48909][ML][MLlib] Uses SparkSession over SparkContext when writing metadata [spark]
HyukjinKwon opened a new pull request, #47366: URL: https://github.com/apache/spark/pull/47366 ### What changes were proposed in this pull request? This PR proposes to use SparkSession over SparkContext when writing metadata ### Why are the changes needed? See https://github.com/apache/spark/pull/47347#issuecomment-2229701812 ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests should cover it. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48763][CONNECT][BUILD] Move connect server and common to builtin module [spark]
bjornjorgensen commented on PR #47157: URL: https://github.com/apache/spark/pull/47157#issuecomment-2230502432 oh.. my fault there was something else that did not work as intended on my build system. @HyukjinKwon thanks for double checking. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48910] Use HashSet to avoid linear searches in PreprocessTableCreation [spark]
vladimirg-db opened a new pull request, #47367: URL: https://github.com/apache/spark/pull/47367 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
HyukjinKwon commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1679158703 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: This is only for `local[...]` so I think more common cases are just trying small data out in their local file system -well maybe this isn't the case also. I will need to run some benchmark though. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata [spark]
HyukjinKwon commented on PR #47347: URL: https://github.com/apache/spark/pull/47347#issuecomment-2230562169 https://github.com/apache/spark/pull/47366 👍 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [DO-NOT-MERGE][SQL] Reduce the number of shuffles in SQL in local mode [spark]
HyukjinKwon commented on code in PR #47349: URL: https://github.com/apache/spark/pull/47349#discussion_r1679158703 ## sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala: ## @@ -641,14 +641,19 @@ object SQLConf { .checkValue(_ > 0, "The value of spark.sql.leafNodeDefaultParallelism must be positive.") .createOptional + // Lazily get the number of cores to make sure SparkContext created first. + private lazy val defaultShufflePartition: Option[Int] = SparkContext.getActive.flatMap { sc => +if (sc.isLocal) Some(SparkContext.numDriverCores(sc.master)) else None Review Comment: This is only for `local[...]` so I think more common cases are just trying small data out in their local file system -well maybe this isn't the case also. I will need to run some benchmark. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48510] Fix for UDAF `toColumn` API when running tests in Maevn [spark]
xupefei opened a new pull request, #47368: URL: https://github.com/apache/spark/pull/47368 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [ONLY TEST][SQL] Improve TPCDSCollationQueryTestSuite [spark]
panbingkun opened a new pull request, #47369: URL: https://github.com/apache/spark/pull/47369 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-36680][SQL][FOLLOWUP] Files with options should be put into resolveDataSource function [spark]
logze opened a new pull request, #47370: URL: https://github.com/apache/spark/pull/47370 ### What changes were proposed in this pull request? When reading csv, json and other files, pass the options parameter to the rules.resolveDataSource method to make the options parameter effective. This is a bug fix for [#46707](https://github.com/apache/spark/pull/46707) ### Why are the changes needed? For the following SQL, the options parameter passed in does not take effect. This is because the rules.resolve DataSource method does not pass the options parameter during the datasource construction process `SELECT * FROM csv.`/test/data.csv` WITH (`header` = true, 'delimiter' = '|')` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test in SQLQuerySuite ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SC-170296] GROUP BY with MapType nested inside complex type [spark]
nebojsa-db commented on code in PR #47331: URL: https://github.com/apache/spark/pull/47331#discussion_r1679332626 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala: ## @@ -892,132 +892,108 @@ case class MapFromEntries(child: Expression) copy(child = newChild) } +// Sorts all MapType expressions based on the ordering of their keys. +// This is used when GROUP BY is done with a MapType (possibly nested) column. case class MapSort(base: Expression) - extends UnaryExpression with NullIntolerant with QueryErrorsBase { + extends UnaryExpression with NullIntolerant with QueryErrorsBase with CodegenFallback { - val keyType: DataType = base.dataType.asInstanceOf[MapType].keyType - val valueType: DataType = base.dataType.asInstanceOf[MapType].valueType + override lazy val canonicalized: Expression = base.canonicalized + + override lazy val deterministic: Boolean = base.deterministic override def child: Expression = base override def dataType: DataType = base.dataType - override def checkInputDataTypes(): TypeCheckResult = base.dataType match { -case m: MapType if RowOrdering.isOrderable(m.keyType) => -TypeCheckResult.TypeCheckSuccess + def recursiveCheckDataTypes(dataType: DataType): TypeCheckResult = dataType match { +case a: ArrayType => recursiveCheckDataTypes(a.elementType) +case StructType(fields) => + fields.collect(sf => recursiveCheckDataTypes(sf.dataType)).filter(_.isFailure).headOption +.getOrElse(TypeCheckResult.TypeCheckSuccess) +case m: MapType if RowOrdering.isOrderable(m.keyType) => TypeCheckResult.TypeCheckSuccess case _: MapType => DataTypeMismatch( errorSubClass = "INVALID_ORDERING_TYPE", messageParameters = Map( "functionName" -> toSQLId(prettyName), - "dataType" -> toSQLType(base.dataType) + "dataType" -> toSQLType(dataType) ) ) -case _ => - DataTypeMismatch( +case _ => TypeCheckResult.TypeCheckSuccess + } + + override def checkInputDataTypes(): TypeCheckResult = { +if (!dataType.existsRecursively(_.isInstanceOf[MapType])) { + return DataTypeMismatch( errorSubClass = "UNEXPECTED_INPUT_TYPE", messageParameters = Map( "paramIndex" -> ordinalNumber(0), "requiredType" -> toSQLType(MapType), "inputSql" -> toSQLExpr(base), "inputType" -> toSQLType(base.dataType)) ) - } - - override def nullSafeEval(array: Any): Any = { -// put keys and their respective values inside a tuple and sort them -// according to the key ordering. Extract the new sorted k/v pairs to form a sorted map - -val mapData = array.asInstanceOf[MapData] -val numElements = mapData.numElements() -val keys = mapData.keyArray() -val values = mapData.valueArray() - -val ordering = PhysicalDataType.ordering(keyType) - -val sortedMap = Array - .tabulate(numElements)(i => (keys.get(i, keyType).asInstanceOf[Any], -values.get(i, valueType).asInstanceOf[Any])) - .sortBy(_._1)(ordering) - -new ArrayBasedMapData(new GenericArrayData(sortedMap.map(_._1)), - new GenericArrayData(sortedMap.map(_._2))) - } - - override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { -nullSafeCodeGen(ctx, ev, b => sortCodegen(ctx, ev, b)) - } - - private def sortCodegen(ctx: CodegenContext, ev: ExprCode, - base: String): String = { - -val arrayBasedMapData = classOf[ArrayBasedMapData].getName -val genericArrayData = classOf[GenericArrayData].getName - -val numElements = ctx.freshName("numElements") -val keys = ctx.freshName("keys") -val values = ctx.freshName("values") -val sortArray = ctx.freshName("sortArray") -val i = ctx.freshName("i") -val o1 = ctx.freshName("o1") -val o1entry = ctx.freshName("o1entry") -val o2 = ctx.freshName("o2") -val o2entry = ctx.freshName("o2entry") -val c = ctx.freshName("c") -val newKeys = ctx.freshName("newKeys") -val newValues = ctx.freshName("newValues") - -val boxedKeyType = CodeGenerator.boxedType(keyType) -val boxedValueType = CodeGenerator.boxedType(valueType) -val javaKeyType = CodeGenerator.javaType(keyType) +} -val simpleEntryType = s"java.util.AbstractMap.SimpleEntry<$boxedKeyType, $boxedValueType>" +if (dataType.existsRecursively(dt => + dt.isInstanceOf[MapType] && !RowOrdering.isOrderable(dt.asInstanceOf[MapType].keyType))) { + DataTypeMismatch( +errorSubClass = "INVALID_ORDERING_TYPE", +messageParameters = Map( + "functionName" -> toSQLId(prettyName), + "dataType" -> toSQLType(dataType) +) + ) +} -val comp = if (CodeGenerator.isPrimitiveType(keyType)) { - val v1 = ctx.freshName("v1") - val v2 = ctx.freshName("v2") - s""" - |$javaKeyType $v1 = (($boxedKeyType) $o1).
[PR] [SPARK-47307][DOCS][FOLLOWUP] Add a migration guide for the behavior change of base64 function [spark]
wForget opened a new pull request, #47371: URL: https://github.com/apache/spark/pull/47371 ### What changes were proposed in this pull request? Follow up to #47303 Add a migration guide for the behavior change of `base64` function ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? doc change ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47307][DOCS][FOLLOWUP] Add a migration guide for the behavior change of base64 function [spark]
wForget commented on PR #47371: URL: https://github.com/apache/spark/pull/47371#issuecomment-2230858686 cc @yaooqinn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48628][CORE] Add task peak on/off heap memory metrics [spark]
Ngone51 commented on code in PR #47192: URL: https://github.com/apache/spark/pull/47192#discussion_r1679431752 ## core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java: ## @@ -202,6 +226,18 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), requestingConsumer); } + + if (mode == MemoryMode.OFF_HEAP) { +synchronized (offHeapMemoryLock) { Review Comment: The whole function `acquireExecutionMemory` is under the protection of `synchronized (this)`, and the release memory can be got from `trySpillAndAcquire()`: ```scala long released = consumerToSpill.spill(requested, requestingConsumer); ``` So I don't think we need extra lock here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47307][DOCS][FOLLOWUP] Add a migration guide for the behavior change of base64 function [spark]
pan3793 commented on code in PR #47371: URL: https://github.com/apache/spark/pull/47371#discussion_r1679440173 ## docs/sql-migration-guide.md: ## @@ -64,6 +64,7 @@ license: | ## Upgrading from Spark SQL 3.5.1 to 3.5.2 - Since 3.5.2, MySQL JDBC datasource will read TINYINT UNSIGNED as ShortType, while in 3.5.1, it was wrongly read as ByteType. +- Since 3.5.2, the `base64` function will return a non-chunked string. To restore the previous behavior where base64 encoded strings that chunked into lines of at most 76 characters, set `spark.sql.legacy.chunkBase64String.enabled` to `true`. Review Comment: the behavior changed in spark 3.3.0, so "the previous behavior" might be ambiguous. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]
sandip-db commented on code in PR #43503: URL: https://github.com/apache/spark/pull/43503#discussion_r1679453180 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala: ## @@ -16,164 +16,201 @@ */ package org.apache.spark.sql.catalyst.xml +import java.io.Writer import java.sql.Timestamp -import javax.xml.stream.XMLStreamWriter +import javax.xml.stream.XMLOutputFactory import scala.collection.Map +import com.sun.xml.txw2.output.IndentingXMLStreamWriter Review Comment: Ack. I didn't find a drop-in replacement for IndentingXMLStreamWriter. I can look into implementing our own version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48388][SQL] Fix SET statement behavior for SQL Scripts [spark]
cloud-fan commented on code in PR #47272: URL: https://github.com/apache/spark/pull/47272#discussion_r1679508361 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -61,11 +61,18 @@ compoundBody compoundStatement : statement +| compoundSetStatement | beginEndCompoundBlock ; +compoundSetStatement Review Comment: why do we call it `compound`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48388][SQL] Fix SET statement behavior for SQL Scripts [spark]
cloud-fan commented on code in PR #47272: URL: https://github.com/apache/spark/pull/47272#discussion_r1679511481 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -251,26 +258,29 @@ statement | (MSCK)? REPAIR TABLE identifierReference (option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable | op=(ADD | LIST) identifier .*? #manageResource -| SET COLLATION collationName=identifier #setCollation -| SET ROLE .*? #failNativeCommand +| CREATE INDEX (IF errorCapturingNot EXISTS)? identifier ON TABLE? +identifierReference (USING indexType=identifier)? +LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN +(OPTIONS options=propertyList)? #createIndex +| DROP INDEX (IF EXISTS)? identifier ON TABLE? identifierReference #dropIndex +| unsupportedHiveNativeCommands .*? #failNativeCommand +; + +setResetStatement +: SET COLLATION collationName=identifier #setCollation +| SET ROLE .*? #failSetRole | SET TIME ZONE interval #setTimeZone | SET TIME ZONE timezone #setTimeZone | SET TIME ZONE .*? #setTimeZone | SET (VARIABLE | VAR) assignmentList #setVariable | SET (VARIABLE | VAR) LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ - LEFT_PAREN query RIGHT_PAREN #setVariable +LEFT_PAREN query RIGHT_PAREN #setVariable Review Comment: can we avoid duplicating the SET VAR parser rule? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48861][SQL] Enable shuffle file removal/skipMigration for all SQL executions [spark]
abellina commented on PR #47360: URL: https://github.com/apache/spark/pull/47360#issuecomment-2231084434 @bozhang2820 @cloud-fan fyi -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48791][CORE][3.5] Fix perf regression caused by the accumulators registration overhead using CopyOnWriteArrayList [spark]
LuciferYang commented on PR #47297: URL: https://github.com/apache/spark/pull/47297#issuecomment-2231142709 > `Scala 2.13 build with SBT` failed: > > ``` > [error] /home/runner/work/spark/spark/mllib-local/src/main/scala/org/apache/spark/ml/stat/distribution/MultivariateGaussian.scala:117:34: Symbol 'type scala.collection.compat.package.IterableOnce' is missing from the classpath. > [error] This symbol is required by 'type breeze.linalg.support.LowPrioCanTraverseValues.X'. > [error] Make sure that type IterableOnce is in your classpath and check for conflicting dependencies with `-Ylog-classpath`. > [error] A full rebuild may help if 'LowPrioCanTraverseValues.class' was compiled against an incompatible version of scala.collection.compat.package. > [error] val tol = Utils.EPSILON * max(d) * d.length > [error] ^ > [error] one error found > ``` > > @LuciferYang Do you have any idea why `mllib` module doesnt't have the depdency of `scala-collection-compat` since we have already directly managed that dependecy in `core` module? > > ``` > [INFO] --- dependency:3.6.0:tree (default-cli) @ spark-mllib_2.13 --- > [INFO] org.apache.spark:spark-mllib_2.13:jar:3.5.2-SNAPSHOT > [INFO] +- org.scala-lang.modules:scala-parser-combinators_2.13:jar:2.3.0:compile > [INFO] +- org.apache.spark:spark-core_2.13:jar:3.5.2-SNAPSHOT:compile > [INFO] | \- org.scala-lang.modules:scala-xml_2.13:jar:2.1.0:compile > [INFO] \- org.scala-lang.modules:scala-parallel-collections_2.13:jar:1.0.4:compile > ``` I reset to 45286ca locally, then ran `build/sbt "mllib/dependencyTree" -Pscala-2.13`. In the dependency tree, I can see `[info] | +-org.scala-lang.modules:scala-collection-compat_2.13:2.7.0 [S]`, so I still have no clue about this issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-47307][DOCS][FOLLOWUP] Add a migration guide for the behavior change of base64 function [spark]
wForget commented on code in PR #47371: URL: https://github.com/apache/spark/pull/47371#discussion_r1679591679 ## docs/sql-migration-guide.md: ## @@ -64,6 +64,7 @@ license: | ## Upgrading from Spark SQL 3.5.1 to 3.5.2 - Since 3.5.2, MySQL JDBC datasource will read TINYINT UNSIGNED as ShortType, while in 3.5.1, it was wrongly read as ByteType. +- Since 3.5.2, the `base64` function will return a non-chunked string. To restore the previous behavior where base64 encoded strings that chunked into lines of at most 76 characters, set `spark.sql.legacy.chunkBase64String.enabled` to `true`. Review Comment: Makes sense, I made a little change, thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48388][SQL] Fix SET statement behavior for SQL Scripts [spark]
davidm-db commented on code in PR #47272: URL: https://github.com/apache/spark/pull/47272#discussion_r1679605344 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -61,11 +61,18 @@ compoundBody compoundStatement : statement +| compoundSetStatement | beginEndCompoundBlock ; +compoundSetStatement Review Comment: To differentiate between the rule used in regular SQL statements (`setResetStatement` that contains statements to set config values, but contains `setVariable` as well) and the rule used in SQL script compounds (that contain only version of variable set to be used in the scripts). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48388][SQL] Fix SET statement behavior for SQL Scripts [spark]
davidm-db commented on code in PR #47272: URL: https://github.com/apache/spark/pull/47272#discussion_r1679616760 ## sql/api/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBaseParser.g4: ## @@ -251,26 +258,29 @@ statement | (MSCK)? REPAIR TABLE identifierReference (option=(ADD|DROP|SYNC) PARTITIONS)? #repairTable | op=(ADD | LIST) identifier .*? #manageResource -| SET COLLATION collationName=identifier #setCollation -| SET ROLE .*? #failNativeCommand +| CREATE INDEX (IF errorCapturingNot EXISTS)? identifier ON TABLE? +identifierReference (USING indexType=identifier)? +LEFT_PAREN columns=multipartIdentifierPropertyList RIGHT_PAREN +(OPTIONS options=propertyList)? #createIndex +| DROP INDEX (IF EXISTS)? identifier ON TABLE? identifierReference #dropIndex +| unsupportedHiveNativeCommands .*? #failNativeCommand +; + +setResetStatement +: SET COLLATION collationName=identifier #setCollation +| SET ROLE .*? #failSetRole | SET TIME ZONE interval #setTimeZone | SET TIME ZONE timezone #setTimeZone | SET TIME ZONE .*? #setTimeZone | SET (VARIABLE | VAR) assignmentList #setVariable | SET (VARIABLE | VAR) LEFT_PAREN multipartIdentifierList RIGHT_PAREN EQ - LEFT_PAREN query RIGHT_PAREN #setVariable +LEFT_PAREN query RIGHT_PAREN #setVariable Review Comment: Reasons for "duplication" (quotes because it's not exactly the same): - In standalone SQL statements, VAR or VARIABLE is mandatory - In SQL scripts, VAR or VARIABLE is not mandatory - In SQL scripts, we still want to allow VAR or VARIABLE to be "compatible" with standalone SQL statements This means that we can either: - Have one parser rule for set variable statements and reuse it, but handle this logic (and potential syntax exceptions) in the visitor functions - Have this handled in the grammar (one rule has ? where VAR or VARIABLE is not mandatory, the other doesn't) We decided on the second approach, but can change to the first one if you think so. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
sahnib commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1679584589 ## python/pyspark/worker.py: ## @@ -1609,6 +1645,35 @@ def mapper(a): vals = [a[o] for o in parsed_offsets[0][1]] return f(keys, vals) +elif eval_type == PythonEvalType.SQL_TRANSFORM_WITH_STATE: +from itertools import tee + +# We assume there is only one UDF here because grouped map doesn't +# support combining multiple UDFs. +assert num_udfs == 1 + +# See FlatMapGroupsInPandasExec for how arg_offsets are used to Review Comment: [nit] See TransformWithStateInPandasExec. ## python/pyspark/sql/streaming/stateful_processor.py: ## @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from abc import ABC, abstractmethod +from typing import Any, TYPE_CHECKING, Iterator, Union + +from pyspark.sql.streaming.state_api_client import StateApiClient +from pyspark.sql.streaming.value_state_client import ValueStateClient + +import pandas as pd +from pyspark.sql.types import ( +StructType, StructField, IntegerType, LongType, ShortType, +FloatType, DoubleType, DecimalType, StringType, BooleanType, +DateType, TimestampType +) + +if TYPE_CHECKING: +from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike + + +class ValueState: Review Comment: We would need docStrings here as users would refer to these classes to understand how different State variables work. ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalTimeModes.scala: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.streaming + +import java.util.Locale + +import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.sql.streaming.TimeMode + +/** + * Internal helper class to generate objects representing various `TimeMode`s, + */ +private[sql] object InternalTimeModes { Review Comment: this is mainly for parsing from a String to TimeMode? If so, should we add this in the `TimeMode.scala` file? ## python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py: ## @@ -0,0 +1,152 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import random +import shutil +import string +import sys +import tempfile +import pandas as pd +from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle +from typing import Iterator + +import unittest +from typing import cast + +from pyspark import SparkConf +from pyspark.sql.streaming.state import GroupStateTimeout, GroupState +from pyspark.sql.types import ( +LongTyp
Re: [PR] [SPARK-48510] Fix for UDAF `toColumn` API when running tests in Maven [spark]
dongjoon-hyun commented on PR #47368: URL: https://github.com/apache/spark/pull/47368#issuecomment-223129 Thank you, @xupefei . cc @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata [spark]
dongjoon-hyun commented on PR #47347: URL: https://github.com/apache/spark/pull/47347#issuecomment-2231293170 Thank you, @HyukjinKwon and all. Merged to master for Apache Spark 4.0.0-preview2. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata [spark]
dongjoon-hyun closed pull request #47347: [SPARK-48896][ML][MLLIB] Avoid repartition when writing out the metadata URL: https://github.com/apache/spark/pull/47347 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48909][ML][MLLIB] Uses SparkSession over SparkContext when writing metadata [spark]
dongjoon-hyun closed pull request #47366: [SPARK-48909][ML][MLLIB] Uses SparkSession over SparkContext when writing metadata URL: https://github.com/apache/spark/pull/47366 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP][SPARK-48911] Improve collation support testing for various expressions [spark]
uros-db opened a new pull request, #47372: URL: https://github.com/apache/spark/pull/47372 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
ericm-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1679749407 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1116,3 +1121,88 @@ def init_stream_yield_batches(batches): batches_to_write = init_stream_yield_batches(serialize_batches()) return ArrowStreamSerializer.dump_stream(self, batches_to_write, stream) + + +class TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer): +""" +Serializer used by Python worker to evaluate UDF for transformWithStateInPandasSerializer. + +Parameters +-- +timezone : str +A timezone to respect when handling timestamp values +safecheck : bool +If True, conversion from Arrow to Pandas checks for overflow/truncation +assign_cols_by_name : bool +If True, then Pandas DataFrames will get columns by name +arrow_max_records_per_batch : int +Limit of the number of records that can be written to a single ArrowRecordBatch in memory. +""" + +def __init__( +self, +timezone, +safecheck, +assign_cols_by_name, +arrow_max_records_per_batch): +super( +TransformWithStateInPandasSerializer, +self +).__init__(timezone, safecheck, assign_cols_by_name) + +# self.state_server_port = state_server_port + +# # open client connection to state server socket +self.arrow_max_records_per_batch = arrow_max_records_per_batch +self.key_offsets = None + +# Nothing special here, we need to create the handle and read +# data in groups. +def load_stream(self, stream): +""" +Read ArrowRecordBatches from stream, deserialize them to populate a list of pair +(data chunk, state), and convert the data into a list of pandas.Series. + +Please refer the doc of inner function `gen_data_and_state` for more details how +this function works in overall. + +In addition, this function further groups the return of `gen_data_and_state` by the state +instance (same semantic as grouping by grouping key) and produces an iterator of data +chunks for each group, so that the caller can lazily materialize the data chunk. +""" +import pyarrow as pa +from itertools import tee + +def generate_data_batches(batches): +for batch in batches: +data_pandas = [self.arrow_to_pandas(c) for c in pa.Table.from_batches([batch]).itercolumns()] Review Comment: Not sure if this is a common pattern in Python, but this line is a little hard to read ## python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py: ## @@ -0,0 +1,152 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import random +import shutil +import string +import sys +import tempfile +import pandas as pd +from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle +from typing import Iterator + +import unittest +from typing import cast + +from pyspark import SparkConf +from pyspark.sql.streaming.state import GroupStateTimeout, GroupState +from pyspark.sql.types import ( +LongType, +StringType, +StructType, +StructField, +Row, +) +from pyspark.testing.sqlutils import ( +ReusedSQLTestCase, +have_pandas, +have_pyarrow, +pandas_requirement_message, +pyarrow_requirement_message, +) +from pyspark.testing.utils import eventually + + +@unittest.skipIf( +not have_pandas or not have_pyarrow, +cast(str, pandas_requirement_message or pyarrow_requirement_message), +) +class TransformWithStateInPandasTestsMixin: +@classmethod +def conf(cls): +cfg = SparkConf() +cfg.set("spark.sql.shuffle.partitions", "5") +cfg.set("spark.sql.streaming.stateStore.providerClass", + "org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider") +return cfg + +def _test_apply_in_pandas_with_state_basic(self, func, check_results): +input_path = tempfile.mkdtemp() + +def prepare_test_resource(): +with open(input_path + "/text-test.txt", "
Re: [PR] [SPARK-48382]Add controller / reconciler module to operator [spark-kubernetes-operator]
dongjoon-hyun commented on PR #12: URL: https://github.com/apache/spark-kubernetes-operator/pull/12#issuecomment-2231421900 Thank you. Did you finish the updates, @jiangzho ? It seems that there are some un-addressed comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48900] Add `reason` field for `cancelJobGroup` and `cancelJobsWithTag` [spark]
mingkangli-db commented on code in PR #47361: URL: https://github.com/apache/spark/pull/47361#discussion_r1679822519 ## core/src/main/scala/org/apache/spark/scheduler/DAGSchedulerEvent.scala: ## @@ -65,10 +65,13 @@ private[scheduler] case class JobCancelled( private[scheduler] case class JobGroupCancelled( groupId: String, +reason: Option[String], Review Comment: Thank you for the suggestions, just addressed all the comments above. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48382]Add controller / reconciler module to operator [spark-kubernetes-operator]
dongjoon-hyun commented on PR #12: URL: https://github.com/apache/spark-kubernetes-operator/pull/12#issuecomment-2231493319 - I reviewed the previous comments and resolved when it's addressed. So, please go through the remaining open comments. We need to address them. - BTW, please re-consider to split this PR once more into architecturally smaller and independent ones. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] add possibility to set log filename & disable spark log rotation [spark]
Tocard opened a new pull request, #47373: URL: https://github.com/apache/spark/pull/47373 As spark cluster administration I want to manage logs of application by my own. Spark-deamon.sh has two main issues with it. - mandatory log file rotation and no way to disbale it - hardcoded log file name without any to way to overide it. This PR allow without changing the default behavior to customize startup log filename & disable log rotation. I tested it on 3.5 on our own cluster with worker, master, thrift & server-history. The only changed behavior changed here is SPARK_LOG_MAX_FILES defaulted to 5 if bad input instaed or fail. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48505][CORE] Simplify the implementation of `Utils#isG1GC` [spark]
dongjoon-hyun commented on PR #46873: URL: https://github.com/apache/spark/pull/46873#issuecomment-2231557976 Thank you so much, @LuciferYang . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]
dongjoon-hyun commented on code in PR #43503: URL: https://github.com/apache/spark/pull/43503#discussion_r1679886747 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala: ## @@ -16,164 +16,201 @@ */ package org.apache.spark.sql.catalyst.xml +import java.io.Writer import java.sql.Timestamp -import javax.xml.stream.XMLStreamWriter +import javax.xml.stream.XMLOutputFactory import scala.collection.Map +import com.sun.xml.txw2.output.IndentingXMLStreamWriter Review Comment: BTW, from JDK community side, this seems to be the unresolved issue. - https://bugs.openjdk.org/browse/JDK-8141405 `Public API equivalent to com.sun.xml.txw2.output.IndentingXMLStreamWriter` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-44790][SQL] XML: to_xml implementation and bindings for python, connect and SQL [spark]
dongjoon-hyun commented on code in PR #43503: URL: https://github.com/apache/spark/pull/43503#discussion_r1679910072 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/xml/StaxXmlGenerator.scala: ## @@ -16,164 +16,201 @@ */ package org.apache.spark.sql.catalyst.xml +import java.io.Writer import java.sql.Timestamp -import javax.xml.stream.XMLStreamWriter +import javax.xml.stream.XMLOutputFactory import scala.collection.Map +import com.sun.xml.txw2.output.IndentingXMLStreamWriter Review Comment: I filed this. Please use this JIRA ID, @sandip-db . - SPARK-48913 Avoid `com.sun.xml.txw2.output.IndentingXMLStreamWriter` usage -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-36680][SQL][FOLLOWUP] Files with options should be put into resolveDataSource function [spark]
szehon-ho commented on code in PR #47370: URL: https://github.com/apache/spark/pull/47370#discussion_r1679926575 ## sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/rules.scala: ## @@ -50,7 +52,11 @@ class ResolveSQLOnFile(sparkSession: SparkSession) extends Rule[LogicalPlan] { private def resolveDataSource(unresolved: UnresolvedRelation): DataSource = { val ident = unresolved.multipartIdentifier -val dataSource = DataSource(sparkSession, paths = Seq(ident.last), className = ident.head) +val dataSource = DataSource( + sparkSession, + paths = Seq(ident.last), + className = ident.head, + options = unresolved.options.asCaseSensitiveMap.asScala.toMap) Review Comment: I think we can remove `.asCaseSensitiveMap` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [WIP] [SPARK-48900] Add `reason` field for all internal calls for job/stage cancellation [spark]
mingkangli-db opened a new pull request, #47374: URL: https://github.com/apache/spark/pull/47374 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Corrected row index usage when exploding packed arrays in vectorized reader [spark]
djspiewak commented on PR #46928: URL: https://github.com/apache/spark/pull/46928#issuecomment-2231694299 Is this being held up by anything? Any JIRA would be a fairly trivial transliteration of the test case that I added. Note the query and the example parquet file. That example does not work today on production Databricks instances (resulting an `ArrayIndexOutOfBoundsException`). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48628][CORE] Add task peak on/off heap memory metrics [spark]
liuzqt commented on code in PR #47192: URL: https://github.com/apache/spark/pull/47192#discussion_r1679983248 ## core/src/main/java/org/apache/spark/memory/TaskMemoryManager.java: ## @@ -202,6 +226,18 @@ public long acquireExecutionMemory(long required, MemoryConsumer requestingConsu logger.debug("Task {} acquired {} for {}", taskAttemptId, Utils.bytesToString(got), requestingConsumer); } + + if (mode == MemoryMode.OFF_HEAP) { +synchronized (offHeapMemoryLock) { Review Comment: `acquireExecutionMemory` is synchronized but `releaseExecutionMemory` is not synchronized. While we maintain current memory in both places, we can either - `synchronized (this)` on `releaseExecutionMemory` - or add another lock, for smaller lock granularity -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48791][CORE][3.4] Fix perf regression caused by the accumulators registration overhead using CopyOnWriteArrayList [spark]
dongjoon-hyun commented on PR #47348: URL: https://github.com/apache/spark/pull/47348#issuecomment-2231724205 This PR seems to be ready, but let's wait until we merge #47297 because we need to keep the backporting order `master` -> `branch-3.5` -> `branch-3.4` to prevent any future regression. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48791][CORE][3.5] Fix perf regression caused by the accumulators registration overhead using CopyOnWriteArrayList [spark]
dongjoon-hyun commented on PR #47297: URL: https://github.com/apache/spark/pull/47297#issuecomment-2231726254 Given that `branch-3.4` PR works, could you re-trigger CI, @Ngone51 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1679998994 ## python/pyspark/sql/pandas/group_ops.py: ## @@ -358,6 +362,141 @@ def applyInPandasWithState( ) return DataFrame(jdf, self.session) + +def transformWithStateInPandas(self, Review Comment: @bogao007 @sahnib - is it possible to just use `transformWithState` here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680002466 ## python/pyspark/sql/pandas/group_ops.py: ## @@ -358,6 +362,141 @@ def applyInPandasWithState( ) return DataFrame(jdf, self.session) + +def transformWithStateInPandas(self, +stateful_processor: StatefulProcessor, +outputStructType: Union[StructType, str], +outputMode: str, +timeMode: str) -> DataFrame: +""" +Invokes methods defined in the stateful processor used in arbitrary state API v2. +We allow the user to act on per-group set of input rows along with keyed state and the +user can choose to output/return 0 or more rows. + +For a streaming dataframe, we will repeatedly invoke the interface methods for new rows +in each trigger and the user's state/state variables will be stored persistently across +invocations. + +The `stateful_processor` should be a Python class that implements the interface defined in +pyspark.sql.streaming.stateful_processor. The stateful processor consists 3 functions: +`init`, `handleInputRows`, and `close`. Review Comment: we also optionally have `handleInitialState` ## python/pyspark/sql/pandas/group_ops.py: ## @@ -33,6 +36,7 @@ PandasCogroupedMapFunction, ArrowGroupedMapFunction, ArrowCogroupedMapFunction, +DataFrameLike as PandasDataFrameLike Review Comment: why do we need to add this ? ## python/pyspark/sql/streaming/state_api_client.py: ## @@ -0,0 +1,142 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from enum import Enum +import os +import socket +from typing import Any, Union, cast + +import pyspark.sql.streaming.StateMessage_pb2 as stateMessage +from pyspark.serializers import write_int, read_int, UTF8Deserializer +from pyspark.sql.types import StructType, _parse_datatype_string + + +class StatefulProcessorHandleState(Enum): +CREATED = 1 +INITIALIZED = 2 +DATA_PROCESSED = 3 +CLOSED = 4 + + Review Comment: is this a pattern for PySpark in general ? ## python/pyspark/sql/pandas/group_ops.py: ## @@ -358,6 +362,55 @@ def applyInPandasWithState( ) return DataFrame(jdf, self.session) + +def transformWithStateInPandas(self, +stateful_processor: StatefulProcessor, +outputStructType: Union[StructType, str], +outputMode: str, +timeMode: str) -> DataFrame: + +from pyspark.sql import GroupedData +from pyspark.sql.functions import pandas_udf +assert isinstance(self, GroupedData) + +def transformWithStateUDF(state_api_client: StateApiClient, key: Any, + inputRows: Iterator["PandasDataFrameLike"]) -> Iterator["PandasDataFrameLike"]: +handle = StatefulProcessorHandle(state_api_client) + +print(f"checking handle state: {state_api_client.handle_state}") Review Comment: @bogao007 - should we remove these now ? ## python/pyspark/sql/pandas/serializers.py: ## @@ -1116,3 +1121,88 @@ def init_stream_yield_batches(batches): batches_to_write = init_stream_yield_batches(serialize_batches()) return ArrowStreamSerializer.dump_stream(self, batches_to_write, stream) + + +class TransformWithStateInPandasSerializer(ArrowStreamPandasUDFSerializer): +""" +Serializer used by Python worker to evaluate UDF for transformWithStateInPandasSerializer. + +Parameters +-- +timezone : str +A timezone to respect when handling timestamp values +safecheck : bool +If True, conversion from Arrow to Pandas checks for overflow/truncation +assign_cols_by_name : bool +If True, then Pandas DataFrames will get columns by name +arrow_max_records_per_batch : int +Limit of the number of records that can be written to a single ArrowRecordBatch in memory. +""" + +def __init__( +self, +timezone, +safecheck, +assign_cols_by_name, +a
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680012217 ## python/pyspark/sql/streaming/StateMessage_pb2.pyi: ## @@ -0,0 +1,139 @@ +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper Review Comment: @bogao007 @sahnib - i have a general question. how do we choose between Protobufv2 and Protobufv3 ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680012933 ## sql/api/src/main/scala/org/apache/spark/sql/catalyst/streaming/InternalTimeModes.scala: ## @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.catalyst.streaming + +import java.util.Locale + +import org.apache.spark.SparkIllegalArgumentException +import org.apache.spark.sql.streaming.TimeMode + +/** + * Internal helper class to generate objects representing various `TimeMode`s, + */ +private[sql] object InternalTimeModes { Review Comment: yea +1 - we don't need to duplicate this. I think we can just add as part of the SQL API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680013451 ## python/pyspark/worker.py: ## @@ -487,6 +489,20 @@ def wrapped(key_series, value_series): return lambda k, v: [(wrapped(k, v), to_arrow_type(return_type))] +def wrap_grouped_transform_with_state_pandas_udf(f, return_type, runner_conf): +_assign_cols_by_name = assign_cols_by_name(runner_conf) + +def wrapped(state_api_client, key, value_series_gen): +import pandas as pd +values = (pd.concat(x, axis=1) for x in value_series_gen) +result_iter = f(state_api_client, key, values) + +# TODO: add verification that elements in result_iter are Review Comment: Do other operators do this too ? ## python/pyspark/util.py: ## @@ -585,6 +586,7 @@ class PythonEvalType: SQL_GROUPED_MAP_PANDAS_UDF_WITH_STATE: "PandasGroupedMapUDFWithStateType" = 208 SQL_GROUPED_MAP_ARROW_UDF: "ArrowGroupedMapUDFType" = 209 SQL_COGROUPED_MAP_ARROW_UDF: "ArrowCogroupedMapUDFType" = 210 +SQL_TRANSFORM_WITH_STATE: "PandasGroupedMapUDFTransformWithStateType" = 211 Review Comment: nit: `SQL_TRANSFORM_WITH_STATE_UDF` ? ## python/pyspark/sql/streaming/state_api_client.py: ## @@ -0,0 +1,142 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from enum import Enum +import os +import socket +from typing import Any, Union, cast + +import pyspark.sql.streaming.StateMessage_pb2 as stateMessage +from pyspark.serializers import write_int, read_int, UTF8Deserializer +from pyspark.sql.types import StructType, _parse_datatype_string + + +class StatefulProcessorHandleState(Enum): +CREATED = 1 +INITIALIZED = 2 +DATA_PROCESSED = 3 +CLOSED = 4 + + +class StateApiClient: +def __init__( +self, +state_server_id: int) -> None: +self._client_socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) +server_address = f'./uds_{state_server_id}.sock' +self._client_socket.connect(server_address) +self.sockfile = self._client_socket.makefile("rwb", + int(os.environ.get("SPARK_BUFFER_SIZE",65536))) +print(f"client is ready - connection established") +self.handle_state = StatefulProcessorHandleState.CREATED +self.utf8_deserializer = UTF8Deserializer() + +def set_handle_state(self, state: StatefulProcessorHandleState) -> None: +print(f"setting handle state to: {state}") +proto_state = self._get_proto_state(state) +set_handle_state = stateMessage.SetHandleState(state=proto_state) +handle_call = stateMessage.StatefulProcessorCall(setHandleState=set_handle_state) +message = stateMessage.StateRequest(statefulProcessorCall=handle_call) + +self._send_proto_message(message) + +response_message = self._receive_proto_message() +status = response_message.statusCode +if (status == 0): +self.handle_state = state +else: +raise Exception(f"Error setting handle state: {response_message.errorMessage}") +print(f"setHandleState status= {status}") + +def set_implicit_key(self, key: str) -> None: +print(f"setting implicit key: {key}") +set_implicit_key = stateMessage.SetImplicitKey(key=key) +request = stateMessage.ImplicitGroupingKeyRequest(setImplicitKey=set_implicit_key) +message = stateMessage.StateRequest(implicitGroupingKeyRequest=request) + +self._send_proto_message(message) +response_message = self._receive_proto_message() +status = response_message.statusCode +print(f"setImplicitKey status= {status}") +if (status != 0): +raise Exception(f"Error setting implicit key: {response_message.errorMessage}") + +def remove_implicit_key(self) -> None: +print(f"removing implicit key") +remove_implicit_key = stateMessage.RemoveImplicitKey() +request = stateMessage.ImplicitGroupingKeyRequest(removeImplicitKey=remove_implicit_key) +message = stateMessage.StateRequest(implicitGroupingKeyRequest=request) + +self._send_p
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on PR #47133: URL: https://github.com/apache/spark/pull/47133#issuecomment-2231760411 @bogao007 - this PR is doing a lot. Could we please atleast a high level description of what files are being added, how they are being used and what features will work after this change is merged ? Thx -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48700] [SQL] Mode expression for complex types (all collations) [spark]
GideonPotok commented on code in PR #47154: URL: https://github.com/apache/spark/pull/47154#discussion_r1680023732 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -86,6 +71,78 @@ case class Mode( buffer } + private def recursivelyGetBufferForArrayType( + a: ArrayType, + data: ArrayData): Seq[Any] = { +(0 until data.numElements()).map { i => + data.get(i, a.elementType) match { +case k: UTF8String if a.elementType.isInstanceOf[StringType] && + !a.elementType.asInstanceOf[StringType].supportsBinaryEquality + => CollationFactory.getCollationKey(k, a.elementType.asInstanceOf[StringType].collationId) Review Comment: Done ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/aggregate/Mode.scala: ## @@ -86,6 +71,78 @@ case class Mode( buffer } + private def recursivelyGetBufferForArrayType( + a: ArrayType, + data: ArrayData): Seq[Any] = { +(0 until data.numElements()).map { i => + data.get(i, a.elementType) match { +case k: UTF8String if a.elementType.isInstanceOf[StringType] && + !a.elementType.asInstanceOf[StringType].supportsBinaryEquality + => CollationFactory.getCollationKey(k, a.elementType.asInstanceOf[StringType].collationId) +case k if a.elementType.isInstanceOf[StructType] => + recursivelyGetBufferForStructType( + k.asInstanceOf[InternalRow].toSeq(a.elementType.asInstanceOf[StructType]).zip( + a.elementType.asInstanceOf[StructType].fields)) +case k if a.elementType.isInstanceOf[ArrayType] => + recursivelyGetBufferForArrayType( +a.elementType.asInstanceOf[ArrayType], +k.asInstanceOf[ArrayData]) +case k => k + } +} + } + + private def getBufferForComplexType( + buffer: OpenHashMap[AnyRef, Long], + d: DataType): Iterable[(AnyRef, Long)] = { + buffer.groupMapReduce { + case (key: InternalRow, _) if d.isInstanceOf[StructType] => +recursivelyGetBufferForStructType(key.toSeq(d.asInstanceOf[StructType]) + .zip(d.asInstanceOf[StructType].fields)) + case (key: ArrayData, _) if d.isInstanceOf[ArrayType] => +recursivelyGetBufferForArrayType(d.asInstanceOf[ArrayType], key) +}(x => x)((x, y) => (x._1, x._2 + y._2)).values + } + + private def isSpecificStringTypeMatch(field: StructField, fieldName: String): Boolean = +field.dataType.isInstanceOf[StringType] && + !field.dataType.asInstanceOf[StringType].supportsBinaryEquality && Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
bogao007 commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680030806 ## python/pyspark/sql/streaming/StateMessage_pb2.pyi: ## @@ -0,0 +1,139 @@ +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper Review Comment: Spark Connect was using proto3 when introducing new proto messages, I was trying to follow the same. I was not able to find a guide in OSS Spark on which version is recommended though. cc @HyukjinKwon -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680017992 ## python/pyspark/sql/streaming/__init__.py: ## @@ -19,3 +19,4 @@ from pyspark.sql.streaming.readwriter import DataStreamReader, DataStreamWriter # noqa: F401 from pyspark.sql.streaming.listener import StreamingQueryListener # noqa: F401 from pyspark.errors import StreamingQueryException # noqa: F401 +from pyspark.sql.streaming.stateful_processor import StatefulProcessor, StatefulProcessorHandle # noqa: F401 Review Comment: Where is this change used ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-48914][SQL] Add OFFSET operator as an option in the subquery generator [spark]
averyqi-db opened a new pull request, #47375: URL: https://github.com/apache/spark/pull/47375 ### What changes were proposed in this pull request? This adds offset operator in subquery generator suite. ### Why are the changes needed? Complete the subquery generator functionality ### Does this PR introduce _any_ user-facing change? previously there's no subqueries having offset operator being tested. Currently offset operator is added. ### How was this patch tested? query test ### Was this patch authored or co-authored using generative AI tooling? No -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680039967 ## python/pyspark/sql/streaming/StateMessage_pb2.pyi: ## @@ -0,0 +1,139 @@ +from google.protobuf.internal import enum_type_wrapper as _enum_type_wrapper Review Comment: Oh ok - the generated name was `pb2` - so I thought its using proto2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680041729 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala: ## @@ -201,7 +202,7 @@ object ExpressionEncoder { * object. Thus, the caller should copy the result before making another call if required. */ class Serializer[T](private val expressions: Seq[Expression]) -extends (T => InternalRow) with Serializable { +extends (T => InternalRow) with Serializable with Logging { Review Comment: yea - can we remove this ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680042299 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/pythonLogicalOperators.scala: ## @@ -161,6 +161,41 @@ case class FlatMapGroupsInPandasWithState( newChild: LogicalPlan): FlatMapGroupsInPandasWithState = copy(child = newChild) } +object TransformWithStateInPandas { Review Comment: could we say `TransformWithStateInPython` instead ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680043141 ## sql/core/pom.xml: ## @@ -240,11 +241,42 @@ bcpkix-jdk18on test + + com.github.jnr Review Comment: Could we do this dependency upgrade as a separate change maybe ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
bogao007 commented on PR #47133: URL: https://github.com/apache/spark/pull/47133#issuecomment-2231795368 > @bogao007 - this PR is doing a lot. Could we please atleast a high level description of what files are being added, how they are being used and what features will work after this change is merged ? Thx Sure, will do. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
bogao007 commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680045141 ## sql/core/pom.xml: ## @@ -240,11 +241,42 @@ bcpkix-jdk18on test + + com.github.jnr Review Comment: This is needed for unix domain socket, I can make the UDS related changes to a separate PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680045887 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{DataInputStream, DataOutputStream, EOFException} +import java.nio.channels.Channels + +import scala.collection.mutable + +import com.google.protobuf.ByteString +import jnr.unixsocket.UnixServerSocketChannel + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Encoder, Encoders, Row} +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateResponse, StateVariableRequest, ValueStateCall} +import org.apache.spark.sql.streaming.ValueState +import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, FloatType, IntegerType, LongType, StructType} + +/** + * This class is used to handle the state requests from the Python side. Review Comment: Could we add some more comments here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680046203 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{DataInputStream, DataOutputStream, EOFException} +import java.nio.channels.Channels + +import scala.collection.mutable + +import com.google.protobuf.ByteString +import jnr.unixsocket.UnixServerSocketChannel + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Encoder, Encoders, Row} +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateResponse, StateVariableRequest, ValueStateCall} +import org.apache.spark.sql.streaming.ValueState +import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, FloatType, IntegerType, LongType, StructType} + +/** + * This class is used to handle the state requests from the Python side. + */ +class TransformWithStateInPandasStateServer( +private val serverChannel: UnixServerSocketChannel, +private val statefulProcessorHandle: StatefulProcessorHandleImpl, +private val groupingKeySchema: StructType) + extends Runnable + with Logging{ Review Comment: nit: indent and also space required after `Logging` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [WIP][SPARK-42204][CORE] Add option to disable redundant logging of TaskMetrics internal accumulators in event logs [spark]
rednaxelafx commented on PR #39763: URL: https://github.com/apache/spark/pull/39763#issuecomment-2231799756 This looks good to me as well (non-binding). Thanks a lot for reviving this improvement! Agreed that the historical Spark UI itself wouldn't be affected. The REST API will get some exposure to the change though. Hopefully there's no client out in the wild that relies on this redundant information. The `JsonProtocolOptions` part looks pragmatic and I'm okay with that. If people are really annoyed by that, there's always the path of creating a `JsonProtocol` class that wraps the `JsonProtocolOptions` and delegates all operations down to the existing companion object. I don't think we have to do there here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680046810 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{DataInputStream, DataOutputStream, EOFException} +import java.nio.channels.Channels + +import scala.collection.mutable + +import com.google.protobuf.ByteString +import jnr.unixsocket.UnixServerSocketChannel + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Encoder, Encoders, Row} +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateResponse, StateVariableRequest, ValueStateCall} +import org.apache.spark.sql.streaming.ValueState +import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, FloatType, IntegerType, LongType, StructType} + +/** + * This class is used to handle the state requests from the Python side. + */ +class TransformWithStateInPandasStateServer( +private val serverChannel: UnixServerSocketChannel, +private val statefulProcessorHandle: StatefulProcessorHandleImpl, +private val groupingKeySchema: StructType) + extends Runnable + with Logging{ + + private var inputStream: DataInputStream = _ + private var outputStream: DataOutputStream = _ + + private val valueStates = mutable.HashMap[String, ValueState[Any]]() + + def run(): Unit = { +logWarning(s"Waiting for connection from Python worker") Review Comment: nit: do we plan to keep this ? if so - can we add more logs to identify the worker process ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680047393 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala: ## @@ -0,0 +1,287 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{DataInputStream, DataOutputStream, EOFException} +import java.nio.channels.Channels + +import scala.collection.mutable + +import com.google.protobuf.ByteString +import jnr.unixsocket.UnixServerSocketChannel + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Encoder, Encoders, Row} +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateResponse, StateVariableRequest, ValueStateCall} +import org.apache.spark.sql.streaming.ValueState +import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, FloatType, IntegerType, LongType, StructType} + +/** + * This class is used to handle the state requests from the Python side. + */ +class TransformWithStateInPandasStateServer( +private val serverChannel: UnixServerSocketChannel, +private val statefulProcessorHandle: StatefulProcessorHandleImpl, +private val groupingKeySchema: StructType) + extends Runnable + with Logging{ + + private var inputStream: DataInputStream = _ + private var outputStream: DataOutputStream = _ + + private val valueStates = mutable.HashMap[String, ValueState[Any]]() + + def run(): Unit = { +logWarning(s"Waiting for connection from Python worker") +val channel = serverChannel.accept() +logWarning(s"listening on channel - ${channel.getLocalAddress}") + +inputStream = new DataInputStream( + Channels.newInputStream(channel)) +outputStream = new DataOutputStream( + Channels.newOutputStream(channel) +) + +while (channel.isConnected && + statefulProcessorHandle.getHandleState != StatefulProcessorHandleState.CLOSED) { + + try { +logWarning(s"reading the version") +val version = inputStream.readInt() + +if (version != -1) { + logWarning(s"version = ${version}") + assert(version == 0) + val messageLen = inputStream.readInt() + logWarning(s"parsing a message of ${messageLen} bytes") + + val messageBytes = new Array[Byte](messageLen) + inputStream.read(messageBytes) + logWarning(s"read bytes = ${messageBytes.mkString("Array(", ", ", ")")}") + + val message = StateRequest.parseFrom(ByteString.copyFrom(messageBytes)) + + logWarning(s"read message = $message") + handleRequest(message) + logWarning(s"flush output stream") + + outputStream.flush() +} + } catch { +case _: EOFException => + logWarning(s"No more data to read from the socket") + statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED) + return +case e: Exception => + logWarning(s"Error reading message: ${e.getMessage}") + sendResponse(1, e.getMessage) + outputStream.flush() + statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED) + return + } +} +logWarning(s"done from the state server thread") + } + + private def handleRequest(message: StateRequest): Unit = { +if (message.getMethodCase == StateRequest.MethodCase.STATEFULPROCESSORCALL) { + val statefulProcessorHandleRequest = message.getStatefulProcessorCall + if (statefulProcessorHandleRequest.getMethodCase == +StatefulProcessorCall.MethodCase.SETHANDLESTATE) { +val requestedState = statefulProcessorHandleRequest.getSetHandleState.getState +requestedState match { + case HandleState.CREATED => +logWarning(s"set handle state to Created") + statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CREATED) + cas
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680047982 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala: ## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import java.io.{BufferedInputStream, BufferedOutputStream, DataInputStream, DataOutputStream} +import java.net.ServerSocket + +import scala.collection.mutable +import scala.util.control.Breaks.break + +import com.google.protobuf.ByteString + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Encoder, Encoders, Row} +import org.apache.spark.sql.execution.streaming.{ImplicitGroupingKeyTracker, StatefulProcessorHandleImpl, StatefulProcessorHandleState} +import org.apache.spark.sql.execution.streaming.state.StateMessage.{HandleState, ImplicitGroupingKeyRequest, StatefulProcessorCall, StateRequest, StateVariableRequest, ValueStateCall} +import org.apache.spark.sql.streaming.ValueState +import org.apache.spark.sql.types.{BooleanType, DataType, DoubleType, FloatType, IntegerType, LongType, StringType, StructType} + +/** + * This class is used to handle the state requests from the Python side. + */ +class TransformWithStateInPandasStateServer( +private val stateServerSocket: ServerSocket, +private val statefulProcessorHandle: StatefulProcessorHandleImpl, +private val groupingKeySchema: StructType) + extends Runnable + with Logging{ + + private var inputStream: DataInputStream = _ + private var outputStream: DataOutputStream = _ + + private val valueStates = mutable.HashMap[String, ValueState[String]]() + + def run(): Unit = { +logWarning(s"Waiting for connection from Python worker") +val listeningSocket = stateServerSocket.accept() +logWarning(s"listening on socket - ${listeningSocket.getLocalAddress}") + +inputStream = new DataInputStream( + new BufferedInputStream(listeningSocket.getInputStream)) +outputStream = new DataOutputStream( + new BufferedOutputStream(listeningSocket.getOutputStream) +) + +while (listeningSocket.isConnected && + statefulProcessorHandle.getHandleState != StatefulProcessorHandleState.CLOSED) { + + logWarning(s"reading the version") + val version = inputStream.readInt() + + if (version != -1) { +logWarning(s"version = ${version}") +assert(version == 0) +val messageLen = inputStream.readInt() +logWarning(s"parsing a message of ${messageLen} bytes") + +val messageBytes = new Array[Byte](messageLen) +inputStream.read(messageBytes) +logWarning(s"read bytes = ${messageBytes.mkString("Array(", ", ", ")")}") + +val message = StateRequest.parseFrom(ByteString.copyFrom(messageBytes)) + +logWarning(s"read message = $message") +handleRequest(message) +logWarning(s"flush output stream") + +outputStream.flush() + } +} + +logWarning(s"done from the state server thread") + } + + private def handleRequest(message: StateRequest): Unit = { +if (message.getMethodCase == StateRequest.MethodCase.STATEFULPROCESSORCALL) { + val statefulProcessorHandleRequest = message.getStatefulProcessorCall + if (statefulProcessorHandleRequest.getMethodCase == +StatefulProcessorCall.MethodCase.SETHANDLESTATE) { +val requestedState = statefulProcessorHandleRequest.getSetHandleState.getState +requestedState match { + case HandleState.CREATED => +logWarning(s"set handle state to Created") + statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CREATED) + case HandleState.INITIALIZED => +logWarning(s"set handle state to Initialized") + statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.INITIALIZED) + case HandleState.CLOSED => +logWarning(s"set handle state to Closed") + statefulProcessorHandle.setHandleState(StatefulProcessorHandleState.CLOSED) + case _ => +} +outputStream.writeInt(0) + } else if (statefulPr
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680048469 ## sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasWriter.scala: ## @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.python + +import org.apache.arrow.vector.VectorSchemaRoot +import org.apache.arrow.vector.ipc.ArrowStreamWriter + +class TransformWithStateInPandasWriter( Review Comment: nit: where is this used ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680048865 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StateTypesEncoderUtils.scala: ## @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.streaming +import org.apache.spark.internal.Logging Review Comment: Could we remove changes in this file entirely ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48903][SS] Set the RocksDB last snapshot version correctly on remote load [spark]
HeartSaVioR commented on code in PR #47363: URL: https://github.com/apache/spark/pull/47363#discussion_r1680048571 ## sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBSuite.scala: ## @@ -1663,9 +1670,8 @@ class RocksDBSuite extends AlsoTestWithChangelogCheckpointingEnabled with Shared db.load(0) db.put("a", "1") -// upload version 1 snapshot created above +// do not upload version 1 snapshot created previously db.doMaintenance() -assert(snapshotVersionsPresent(remoteDir) == Seq(1)) Review Comment: Shall we explicitly test this as this is a one of major spec of change? We should confirm that Nil will be returned. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680059077 ## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateStore.scala: ## @@ -408,6 +408,9 @@ object StateStoreProvider { hadoopConf: Configuration, useMultipleValuesPerKey: Boolean): StateStoreProvider = { val provider = create(storeConf.providerClass) +// scalastyle:off println +println(s"provider config is ${storeConf.providerClass}, all config = ${storeConf.sqlConfs}") Review Comment: intentional ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on code in PR #47133: URL: https://github.com/apache/spark/pull/47133#discussion_r1680059417 ## sql/core/src/test/scala/org/apache/spark/sql/streaming/TransformWithStateInPandasSuite.scala: ## @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.sql.streaming.StreamTest +import org.apache.spark.sql.types.{LongType, StringType, StructField, StructType} +import org.apache.spark.sql.IntegratedUDFTestUtils.TestGroupedMapPandasUDFWithState +import org.apache.spark.sql.catalyst.expressions.PythonUDF +import org.apache.spark.sql.catalyst.streaming.InternalOutputModes.Update +import org.apache.spark.sql.execution.streaming.MemoryStream +import org.apache.spark.tags.SlowSQLTest + +@SlowSQLTest +class TransformWithStateInPandasSuite extends StreamTest { + + test("transformWithStateInPandas - streaming") { Review Comment: Could we add some more tests here ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-48755] State V2 base implementation and ValueState support [spark]
anishshri-db commented on PR #47133: URL: https://github.com/apache/spark/pull/47133#issuecomment-2231820006 @bogao007 - test failure seems related ? ``` [error] /home/runner/work/spark/spark/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasExec.scala:51:12: class TransformWithStateInPandasExec needs to be abstract. [error] Missing implementation for member of trait StatefulOperator: [error] def validateAndMaybeEvolveStateSchema(hadoopConf: org.apache.hadoop.conf.Configuration, batchId: Long, stateSchemaVersion: Int): Array[String] = ??? [error] case class TransformWithStateInPandasExec( ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org