Re: [PR] [SPARK-49479][CORE] Cancel the Timer non-daemon thread on stopping the BarrierCoordinator [spark]
jjayadeep06 commented on code in PR #50020: URL: https://github.com/apache/spark/pull/50020#discussion_r1983092651 ## core/src/main/scala/org/apache/spark/BarrierCoordinator.scala: ## @@ -80,8 +82,13 @@ private[spark] class BarrierCoordinator( states.forEachValue(1, clearStateConsumer) states.clear() listenerBus.removeListener(listener) - ThreadUtils.shutdown(timer) } finally { + timerFutures.asScala.foreach(_.cancel(true)) + // Collections.synchronizedList should be synchronized for thread safety + timerFutures.synchronized { +timerFutures.clear() Review Comment: Correct, removed it ## core/src/main/scala/org/apache/spark/BarrierCoordinator.scala: ## @@ -119,7 +126,9 @@ private[spark] class BarrierCoordinator( // A timer task that ensures we may timeout for a barrier() call. private var timerTask: TimerTask = null -// Init a TimerTask for a barrier() call. +/* Init a TimerTask for a barrier() call and also add logic to handle Thread Interruption Review Comment: Updated the comment description -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [spark]
srowen commented on code in PR #50209: URL: https://github.com/apache/spark/pull/50209#discussion_r1986124610 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala: ## @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool( } def reset(): Unit = synchronized { -scheduled.foreach(_.cancel(true)) +scheduled.foreach(_.cancel(false)) Review Comment: Why change this though? if it were interruptible, we'd want to interrupt for sure. What is the issue if you try to interrupt something and it doesn't do anything? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [spark]
srowen commented on code in PR #50209: URL: https://github.com/apache/spark/pull/50209#discussion_r1986206274 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala: ## @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool( } def reset(): Unit = synchronized { -scheduled.foreach(_.cancel(true)) +scheduled.foreach(_.cancel(false)) Review Comment: Is there overhead? it just sets the interrupted status of the thread. I'm worried that, if the code changed in some way that would make it interruptible, then we lose the ability to interrupt on cancel. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [spark]
beliefer commented on code in PR #50209: URL: https://github.com/apache/spark/pull/50209#discussion_r1986207234 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala: ## @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool( } def reset(): Unit = synchronized { -scheduled.foreach(_.cancel(true)) +scheduled.foreach(_.cancel(false)) Review Comment: One hand, I want avoid the overhead of the invalid thread interruption, even if the overhead is not big enough. On the other hand, I suggest the reasonable use of interrupts: use interrupt mechanisms only when needed to avoid excessive dependence. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51338][INFRA] Add automated CI build for `connect-examples` [spark]
srowen commented on PR #50187: URL: https://github.com/apache/spark/pull/50187#issuecomment-2708415916 Can the examples module simply point to SNAPSHOT versions like everything else in the build? the main branch code is always pointing at unreleased code, but on release, those SNAPSHOT versions are changed to the current release version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51338][INFRA] Add automated CI build for `connect-examples` [spark]
LuciferYang commented on PR #50187: URL: https://github.com/apache/spark/pull/50187#issuecomment-2708419792 > Can the examples module simply point to SNAPSHOT versions like everything else in the build? the main branch code is always pointing at unreleased code, but on release, those SNAPSHOT versions are changed to the current release version. Yes, if it can be done this way, I think it's ok. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51338] Add automated CI build for `connect-examples` [spark]
hvanhovell commented on PR #50187: URL: https://github.com/apache/spark/pull/50187#issuecomment-2704355339 @HyukjinKwon can you take a look? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] Change host to ip [spark]
AryelSouza opened a new pull request, #50216: URL: https://github.com/apache/spark/pull/50216 an issue was open about the necessity of changing host to ip because ip would be deprecated for the documentation -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51182][SQL] DataFrameWriter should throw dataPathNotSpecifiedError when path is not specified [spark]
vrozov commented on PR #49928: URL: https://github.com/apache/spark/pull/49928#issuecomment-2708396116 @cloud-fan @LuciferYang I prefer to keep the test in the java as it does not hurt and 1. There is similar test in R even though it is not R specific 2. Other tests in `JavaDataFrameReaderWriterSuite.java` are not java specific either -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [spark]
beliefer commented on code in PR #50209: URL: https://github.com/apache/spark/pull/50209#discussion_r1986190446 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala: ## @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool( } def reset(): Unit = synchronized { -scheduled.foreach(_.cancel(true)) +scheduled.foreach(_.cancel(false)) Review Comment: These tasks can't response interrupt even if we want to interrupt them. `cancel(true)` causes thread interrupt, but it will not affect these tasks that can't response interrupt. In addition, interrupting operations have extra overhead. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [spark]
srowen commented on code in PR #50209: URL: https://github.com/apache/spark/pull/50209#discussion_r1986190918 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala: ## @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool( } def reset(): Unit = synchronized { -scheduled.foreach(_.cancel(true)) +scheduled.foreach(_.cancel(false)) Review Comment: If true, then this change doesn't do anything, right? I wonder what problem this change is solving. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] Remove session string calls [spark]
github-actions[bot] commented on PR #48974: URL: https://github.com/apache/spark/pull/48974#issuecomment-2708582247 We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable. If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [spark]
beliefer commented on code in PR #50209: URL: https://github.com/apache/spark/pull/50209#discussion_r1986202471 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala: ## @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool( } def reset(): Unit = synchronized { -scheduled.foreach(_.cancel(true)) +scheduled.foreach(_.cancel(false)) Review Comment: This PR want avoid the overhead of the invalid thread interrupt. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [spark]
beliefer commented on code in PR #50209: URL: https://github.com/apache/spark/pull/50209#discussion_r1986202471 ## connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/FetchedDataPool.scala: ## @@ -139,7 +139,7 @@ private[consumer] class FetchedDataPool( } def reset(): Unit = synchronized { -scheduled.foreach(_.cancel(true)) +scheduled.foreach(_.cancel(false)) Review Comment: This PR want avoid the overhead of the invalid thread interruption. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51365][SQL][TESTS] Reduce `SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD/RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD` for tests related to `SharedSparkSession/TestHive` when using `macOS + Apple S
dongjoon-hyun commented on code in PR #50206: URL: https://github.com/apache/spark/pull/50206#discussion_r1986220526 ## .github/workflows/build_maven_java21_macos15.yml: ## @@ -36,5 +36,9 @@ jobs: os: macos-15 envs: >- { - "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES" + "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES", + "TEST_SQL_SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD": "256", + "TEST_SQL_RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD": "256", + "TEST_HIVE_SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD": "48", + "TEST_HIVE_RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD": "48" Review Comment: This looks better. In order to avoid any potential conflicts, shall we use `SPARK_TEST_` prefix, @LuciferYang ? ``` $ git grep SPARK_TEST_ resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala: (if (Utils.isTesting) Seq("SPARK_TEST_HADOOP_CONF_DIR") else Nil) resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala: extraEnv = Map("SPARK_TEST_HADOOP_CONF_DIR" -> customConf.getAbsolutePath())) sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala: private val sparkTestingDir = Option(System.getProperty(SPARK_TEST_CACHE_DIR_SYSTEM_PROPERTY)) sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala: if (Option(System.getProperty(SPARK_TEST_CACHE_DIR_SYSTEM_PROPERTY)).isEmpty) { sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala: private val SPARK_TEST_CACHE_DIR_SYSTEM_PROPERTY = "spark.test.cache-dir" sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala: private val testVersions = sys.env.get("SPARK_TEST_HIVE_CLIENT_VERSIONS") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51365][SQL][TESTS] Reduce `SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD/RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD` for tests related to `SharedSparkSession/TestHive` when using `macOS + Apple S
LuciferYang commented on code in PR #50206: URL: https://github.com/apache/spark/pull/50206#discussion_r1986220726 ## .github/workflows/build_maven_java21_macos15.yml: ## @@ -36,5 +36,9 @@ jobs: os: macos-15 envs: >- { - "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES" + "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES", + "TEST_SQL_SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD": "256", + "TEST_SQL_RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD": "256", + "TEST_HIVE_SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD": "48", + "TEST_HIVE_RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD": "48" Review Comment: ok -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51365][SQL][TESTS] Reduce `SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD/RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD` for tests related to `SharedSparkSession/TestHive` when using `macOS + Apple S
dongjoon-hyun commented on code in PR #50206: URL: https://github.com/apache/spark/pull/50206#discussion_r1986220526 ## .github/workflows/build_maven_java21_macos15.yml: ## @@ -36,5 +36,9 @@ jobs: os: macos-15 envs: >- { - "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES" + "OBJC_DISABLE_INITIALIZE_FORK_SAFETY": "YES", + "TEST_SQL_SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD": "256", + "TEST_SQL_RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD": "256", + "TEST_HIVE_SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD": "48", + "TEST_HIVE_RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD": "48" Review Comment: This looks better. In order to avoid any potential conflicts, shall we use `SPARK_TEST_` prefix instead of `TEST_` prefix, @LuciferYang ? ``` $ git grep SPARK_TEST_ resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala: (if (Utils.isTesting) Seq("SPARK_TEST_HADOOP_CONF_DIR") else Nil) resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnClusterSuite.scala: extraEnv = Map("SPARK_TEST_HADOOP_CONF_DIR" -> customConf.getAbsolutePath())) sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala: private val sparkTestingDir = Option(System.getProperty(SPARK_TEST_CACHE_DIR_SYSTEM_PROPERTY)) sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala: if (Option(System.getProperty(SPARK_TEST_CACHE_DIR_SYSTEM_PROPERTY)).isEmpty) { sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveExternalCatalogVersionsSuite.scala: private val SPARK_TEST_CACHE_DIR_SYSTEM_PROPERTY = "spark.test.cache-dir" sql/hive/src/test/scala/org/apache/spark/sql/hive/client/HiveClientVersions.scala: private val testVersions = sys.env.get("SPARK_TEST_HIVE_CLIENT_VERSIONS") ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-51444][CORE] Remove unreachable code from `TaskSchedulerImpl#statusUpdate` [spark]
LuciferYang opened a new pull request, #50218: URL: https://github.com/apache/spark/pull/50218 ### What changes were proposed in this pull request? ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? ### How was this patch tested? ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51402][SQL][TESTS] Test TimeType in UDF [spark]
MaxGekk commented on code in PR #50194: URL: https://github.com/apache/spark/pull/50194#discussion_r1986236836 ## dev/create-release/release-build.sh: ## @@ -137,6 +137,12 @@ if [[ "$1" == "finalize" ]]; then --repository-url https://upload.pypi.org/legacy/ \ "pyspark_connect-$PYSPARK_VERSION.tar.gz" \ "pyspark_connect-$PYSPARK_VERSION.tar.gz.asc" + svn update "pyspark-client-$RELEASE_VERSION.tar.gz" + svn update "pyspark-client-$RELEASE_VERSION.tar.gz.asc" + TWINE_USERNAME=spark-upload TWINE_PASSWORD="$PYPI_PASSWORD" twine upload \ +--repository-url https://upload.pypi.org/legacy/ \ +"pyspark-client-$RELEASE_VERSION.tar.gz" \ +"pyspark-client-$RELEASE_VERSION.tar.gz.asc" Review Comment: Please, revert unrelated changes. ## sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala: ## @@ -1197,6 +1197,35 @@ class UDFSuite extends QueryTest with SharedSparkSession { Row(Row(null))) } + test("SPARK-51402: Test TimeType in UDF") { +// Mocks +val mockTimeStr = "00:00:00.00" +val input = Seq(java.time.LocalTime.parse(mockTimeStr)).toDF("currentTime") +// Regular case +val plusHour = udf((l: java.time.LocalTime) => l.plusHours(1)) +val result = input.select(plusHour($"currentTime").cast(TimeType()).as("newTime")) Review Comment: I guess the case is not needed since the type is `TimeType()` already. ## sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala: ## @@ -862,7 +862,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { .select(myUdf1(Column("col"))), Row(ArrayBuffer(100))) - val myUdf2 = udf((a: immutable.ArraySeq[Int]) => +val myUdf2 = udf((a: immutable.ArraySeq[Int]) => Review Comment: ok, let's leave it but, please, avoid unrelated changes in the future: - there are many places in code base where you could fix indentations. It is better to open a separate PR, and focus only on this. - the changes can cause merge conflicts in down stream branches -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51436][CORE][SQL][K8s][SS] Fix bug that cancel Future specified mayInterruptIfRunning with true [spark]
beliefer commented on PR #50209: URL: https://github.com/apache/spark/pull/50209#issuecomment-2708241246 ping @srowen @dongjoon-hyun @LuciferYang -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51429][Connect] Add "Acknowledgement" message to ExecutePlanResponse [spark]
vicennial commented on PR #50193: URL: https://github.com/apache/spark/pull/50193#issuecomment-2708400817 Putting this on hold ATM as some unexpected complications popped up (particularly the interactions with response indices and response caching) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51298][SQL] Support variant in CSV scan [spark]
sandip-db commented on code in PR #50052: URL: https://github.com/apache/spark/pull/50052#discussion_r1986121542 ## sql/core/src/test/scala/org/apache/spark/sql/CsvFunctionsSuite.scala: ## @@ -760,12 +760,9 @@ class CsvFunctionsSuite extends QueryTest with SharedSparkSession { context = ExpectedContext(fragment = "to_csv", getCurrentClassCallSitePattern) ) -checkError( - exception = intercept[SparkUnsupportedOperationException] { -df.select(from_csv(lit("data"), valueSchema, Map.empty[String, String])).collect() - }, - condition = "UNSUPPORTED_DATATYPE", - parameters = Map("typeName" -> "\"VARIANT\"") +checkAnswer( + df.select(from_csv(lit("1,2,3"), valueSchema, Map.empty[String, String])), + Seq(Row(Row(1L, "2", new VariantVal(Array[Byte](12, 3), Array[Byte](1, 0, 0) ) Review Comment: Move this to a new test and also add `singleVariantColumn` scenario for `from_csv`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50763][SQL] Add Analyzer rule for resolving SQL table functions [spark]
cloud-fan commented on code in PR #49471: URL: https://github.com/apache/spark/pull/49471#discussion_r1982863500 ## sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala: ## @@ -1675,6 +1676,91 @@ class SessionCatalog( } } + /** + * Constructs a SQL table function plan. + * + * Example SQL table function: + * + * CREATE FUNCTION foo(x INT) RETURNS TABLE(a INT) RETURN SELECT x + 1 AS x1 + * + * Query: + * + * SELECT * FROM foo(1); + * + * Plan: + * + * Project [CAST(x1 AS INT) AS a] + * +- LateralJoin lateral-subquery [x] + * : +- Project [(outer(x) + 1) AS x1] + * : +- OneRowRelation + * +- Project [CAST(1 AS INT) AS x] + * +- OneRowRelation + */ + def makeSQLTableFunctionPlan( + name: String, + function: SQLFunction, + input: Seq[Expression], + outputAttrs: Seq[Attribute]): LogicalPlan = { +assert(function.isTableFunc) +val funcName = function.name.funcName + +// Use captured SQL configs when parsing a SQL function. +val conf = new SQLConf() +function.getSQLConfigs.foreach { case (k, v) => conf.settings.put(k, v) } +SQLConf.withExistingConf(conf) { + val inputParam = function.inputParam + val returnParam = function.getTableFuncReturnCols + val (_, query) = function.getExpressionAndQuery(parser, isTableFunc = true) + assert(query.isDefined) + + // Check function arguments + val paramSize = inputParam.map(_.size).getOrElse(0) + if (input.size > paramSize) { +throw QueryCompilationErrors.wrongNumArgsError( + name, paramSize.toString, input.size) + } + + val body = if (inputParam.isDefined) { +val param = inputParam.get +// Attributes referencing the input parameters inside the function can use the +// function name as a qualifier. +val qualifier = Seq(funcName) +val paddedInput = input ++ + param.takeRight(paramSize - input.size).map { p => +val defaultExpr = p.getDefault() +if (defaultExpr.isDefined) { + Cast(parseDefault(defaultExpr.get, parser), p.dataType) +} else { + throw QueryCompilationErrors.wrongNumArgsError( +name, paramSize.toString, input.size) +} + } + +val inputCast = paddedInput.zip(param.fields).map { + case (expr, param) => +// Add outer references to all attributes in the function input. +val outer = expr.transform { + case a: Attribute => OuterReference(a) +} +Alias(Cast(outer, param.dataType), param.name)(qualifier = qualifier) Review Comment: So a simple way is we don't add cast in https://github.com/apache/spark/pull/49471/files#diff-9dd0899e5406230aeff96654432da54f35255f6dc60eecb87264a5c508a8c826R1732 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50892][SQL]Add UnionLoopExec, physical operator for recursion, to perform execution of recursive queries [spark]
cloud-fan commented on PR #49955: URL: https://github.com/apache/spark/pull/49955#issuecomment-2705416036 @peter-toth Ideally recursive CTE should stop if the last iteration generates no data. Pushing down the LIMIT and applying an early stop is an optimization and should not change the query result. With the above principle in mind, let's look at a concrete example: A recursive CTE that generates one row RDD (single partition) at each iteration, and the 100th iteration generates no data to stop the loop. With global limit 10, we can stop at the 10th iteration, as we have generated enough records. With local limit 10, we can't early stop and still need to wait until the 100th iteration, which at the end returns a union RDD with 100 partition and each partition has one row. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-51443] Fix singleVariantColumn in DSv2 and readStream. [spark]
chenhao-db opened a new pull request, #50217: URL: https://github.com/apache/spark/pull/50217 ### What changes were proposed in this pull request? The current JSON `singleVariantColumn` mode doesn't work in DSv2 and `spark.readStream`. This PR fixes the two cases: - DSv1 calls `JsonFileFormat.inferSchema`, which calls `JsonFileFormat.inferSchema`; DSv2 calls `JsonFileFormat.inferSchema`. The previous `singleVariantColumn` code was in `JsonFileFormat.inferSchema`, and is now moved into `JsonFileFormat.inferSchema`, so that both cases can be covered. - `spark.readStream` requires that there must be a user-specified schema. `singleVariantColumn` plays the same row as a user-specified schema, but the check would fail. It also includes a small refactor that moves the option name definition `singleVariantColumn` from `JSONOptions` to `DataSourceOptions`. It will be a common option name shared by multiple data sources (e.g., CSV) when we add the implementation in the future. ### Why are the changes needed? It is a bug fix that improves the usability of variant. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Unit test. A test previously in `VariantSuite` is moved to `JsonSuite`, so that we can test the read behavior in both `JsonV1Suite` and `JsonV2Suite`. The test is also extended to include `spark.readStream`. ### Was this patch authored or co-authored using generative AI tooling? No. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45265][SQL] Support Hive 4.0 metastore [spark]
hidataplus commented on code in PR #48823: URL: https://github.com/apache/spark/pull/48823#discussion_r1986028539 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala: ## @@ -1030,7 +1030,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat } val metaStoreParts = partsWithLocation .map(p => p.copy(spec = toMetaStorePartitionSpec(p.spec))) -client.createPartitions(db, table, metaStoreParts, ignoreIfExists) +client.createPartitions(tableMeta, metaStoreParts, ignoreIfExists) Review Comment: why? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51338][INFRA] Add automated CI build for `connect-examples` [spark]
LuciferYang commented on PR #50187: URL: https://github.com/apache/spark/pull/50187#issuecomment-2708296150 I've noticed a rather peculiar issue here. It seems that the `connect-examples` project is dependent on a released version of Spark, which means we can only update to a new version after it has been officially released. For example, when Spark 4.0.0 is released, the version of this project in the published v4.0.0 tag won't be Spark 4.0.0, but rather some previously released version. Am I right about this? If I'm wrong, please correct me. If I'm right, doesn't that seem a bit strange? Therefore, I still believe that `connect-examples` module should be placed outside of the Spark Repo, so that we can update and release it in another codebase after Spark 4.0.0(Or other official versions) is out. https://github.com/apache/spark/blob/f40bf4dd7166e86ea5cc9962e8c5b68b88f8dcb5/connect-examples/server-library-example/pom.xml#L19-L42 What do you think about this? @HyukjinKwon @hvanhovell @dongjoon-hyun @cloud-fan @srowen @yaooqinn -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[PR] [SPARK-51359][CORE][SQL] Set INT64 as the default timestamp type for Parquet files [spark]
ganeshashree opened a new pull request, #50215: URL: https://github.com/apache/spark/pull/50215 ### What changes were proposed in this pull request? Changes done to set INT64 as the default timestamp type for Parquet files. ### Why are the changes needed? The INT96 timestamp type has been deprecated as part of [PARQUET-323](https://issues.apache.org/jira/browse/PARQUET-323). However, Apache Spark still uses INT96 as the default outputTimestampType for Parquet files ([code link](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L1157)). This could create incompatibilities when Parquet data written by Spark is read by readers that do not support the INT96 type. We should consider changing the default timestamp type to INT64 in future versions. ### Does this PR introduce _any_ user-facing change? The default timestamp type for Parquet files will be changed to INT64. Older versions of applications that support INT96 should enable the INT96 type by setting `spark.sql.parquet.int96AsTimestamp` to `true` and `spark.sql.parquet.outputTimestampType` to `INT96`. ### How was this patch tested? Updated the unit tests in ParquetSchemaSuite and SQLConfSuite to reflect INT64 as the default timestamp type for Parquet files. ### Was this patch authored or co-authored using generative AI tooling? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-43221][CORE] Host local block fetching should use a block status of a block stored on disk [spark]
attilapiros commented on code in PR #50122: URL: https://github.com/apache/spark/pull/50122#discussion_r1986090749 ## core/src/test/scala/org/apache/spark/storage/BlockManagerSuite.scala: ## @@ -474,6 +474,26 @@ class BlockManagerSuite extends SparkFunSuite with Matchers with PrivateMethodTe assert(!BlockManagerId("notADriverIdentifier", "XXX", 1).isDriver) } + test("SPARK-43221: Host local block fetching should use a block status with disk size") { +conf.set(IO_ENCRYPTION_ENABLED, true) Review Comment: Yes, it is needed for the reproduction of the error. Otherwise we would read the complete file (use the file size instead of the size get from `getLocationsAndStatus` which might be the in-memory block size before this PR). Size get from `BlockManagerMasterEndpoint#getLocationsAndStatus()` and passed as `blockSize`: https://github.com/apache/spark/blob/4231d58245251a34ae80a38ea4bbf7d720caa439/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1240 Filesize get as `file.length`: https://github.com/apache/spark/blob/4231d58245251a34ae80a38ea4bbf7d720caa439/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L1244 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-45265][SQL] Support Hive 4.0 metastore [spark]
hidataplus commented on code in PR #48823: URL: https://github.com/apache/spark/pull/48823#discussion_r1986020465 ## sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala: ## @@ -177,8 +179,10 @@ private[hive] class HiveClientImpl( // got changed. We reset it to clientLoader.ClassLoader here. state.getConf.setClassLoader(clientLoader.classLoader) shim.setCurrentSessionState(state) -state.out = new PrintStream(outputBuffer, true, UTF_8.name()) -state.err = new PrintStream(outputBuffer, true, UTF_8.name()) +val clz = state.getClass.getField("out").getType.asInstanceOf[Class[_ <: PrintStream]] +val ctor = clz.getConstructor(classOf[OutputStream], classOf[Boolean], classOf[String]) +state.getClass.getField("out").set(state, ctor.newInstance(outputBuffer, true, UTF_8.name())) Review Comment: build fail : ”the result type of an implicit conversion must be more specific than Object“ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51365][SQL][TESTS] Reduce `SHUFFLE_EXCHANGE_MAX_THREAD_THRESHOLD/RESULT_QUERY_STAGE_MAX_THREAD_THRESHOLD` for tests related to `SharedSparkSession/TestHive` when using `macOS + Apple S
LuciferYang commented on PR #50206: URL: https://github.com/apache/spark/pull/50206#issuecomment-2708248561 The PR title and description will be updated after the finalization of the plan. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-51182][SQL] DataFrameWriter should throw dataPathNotSpecifiedError when path is not specified [spark]
LuciferYang commented on PR #49928: URL: https://github.com/apache/spark/pull/49928#issuecomment-2708280968 > @vrozov can you remove the java test? [#49928 (comment)](https://github.com/apache/spark/pull/49928#discussion_r1981021185) +1, for @cloud-fan's comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org