sarutak opened a new pull request, #52575:
URL: https://github.com/apache/spark/pull/52575
### What changes were proposed in this pull request?
In Spark Connect environment, `QueryExecution#observedMetrics` can be called
by two threads concurrently.
* Thread1(ObservationManager)
```
private def tryComplete(qe: QueryExecution): Unit = {
val allMetrics = qe.observedMetrics
qe.logical.foreach {
case c: CollectMetrics =>
allMetrics.get(c.name).foreach { metrics =>
val observation = observations.remove((c.name, c.dataframeId))
if (observation != null) {
observation.setMetricsAndNotify(metrics)
}
}
case _ =>
}
}
```
* Thread2(SparkConnectPlanExecution)
```
private def createObservedMetricsResponse(
sessionId: String,
observationAndPlanIds: Map[String, Long],
dataframe: DataFrame): Option[ExecutePlanResponse] = {
val observedMetrics = dataframe.queryExecution.observedMetrics.collect {
case (name, row) if !executeHolder.observations.contains(name) =>
val values = SparkConnectPlanExecution.toObservedMetricsValues(row)
name -> values
}
````
This can cause race condition issues. We can see CI failure caused by this
issue.
https://github.com/apache/spark/actions/runs/18422173471/job/52497913985
```
======================================================================
ERROR [0.181s]: test_observe_with_map_type
(pyspark.sql.tests.connect.test_parity_observation.DataFrameObservationParityTests.test_observe_with_map_type)
----------------------------------------------------------------------
Traceback (most recent call last):
File "/__w/spark/spark/python/pyspark/testing/utils.py", line 228, in
wrapper
lastValue = condition(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/__w/spark/spark/python/pyspark/sql/tests/test_observation.py", line
226, in test_observe_with_map_type
assertDataFrameEqual(df, [Row(id=id) for id in range(10)])
File "/__w/spark/spark/python/pyspark/testing/utils.py", line 1098, in
assertDataFrameEqual
actual_list = actual.collect()
^^^^^^^^^^^^^^^^
File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line
1817, in collect
table, schema = self._to_table()
^^^^^^^^^^^^^^^^
File "/__w/spark/spark/python/pyspark/sql/connect/dataframe.py", line
1830, in _to_table
table, schema, self._execution_info = self._session.client.to_table(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line
946, in to_table
table, schema, metrics, observed_metrics, _ =
self._execute_and_fetch(req, observations)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line
1642, in _execute_and_fetch
for response in self._execute_and_fetch_as_iterator(
File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line
1619, in _execute_and_fetch_as_iterator
self._handle_error(error)
File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line
1893, in _handle_error
self._handle_rpc_error(error)
File "/__w/spark/spark/python/pyspark/sql/connect/client/core.py", line
1966, in _handle_rpc_error
raise convert_exception(
pyspark.errors.exceptions.connect.IllegalArgumentException: requirement
failed
JVM stacktrace:
java.lang.IllegalArgumentException
at scala.Predef$.require(Predef.scala:324)
at
org.apache.spark.sql.catalyst.util.ArrayBasedMapData.<init>(ArrayBasedMapData.scala:31)
at
org.apache.spark.sql.catalyst.util.ArrayBasedMapBuilder.build(ArrayBasedMapBuilder.scala:130)
at
org.apache.spark.sql.catalyst.expressions.CreateMap.eval(complexTypeCreator.scala:260)
at
org.apache.spark.sql.catalyst.expressions.Alias.eval(namedExpressions.scala:162)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:84)
at
org.apache.spark.sql.execution.AggregatingAccumulator.$anonfun$value$2(AggregatingAccumulator.scala:199)
at
org.apache.spark.sql.internal.SQLConf$.withExistingConf(SQLConf.scala:162)
at
org.apache.spark.sql.execution.AggregatingAccumulator.withSQLConf(AggregatingAccumulator.scala:106)
at
org.apache.spark.sql.execution.AggregatingAccumulator.value(AggregatingAccumulator.scala:188)
at
org.apache.spark.sql.execution.CollectMetricsExec.collectedMetrics(CollectMetricsExec.scala:59)
at
org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:111)
at
org.apache.spark.sql.execution.CollectMetricsExec$$anonfun$1.applyOrElse(CollectMetricsExec.scala:109)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:338)
at scala.PartialFunction$Lifted.apply(PartialFunction.scala:334)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1(AdaptiveSparkPlanHelper.scala:86)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collect$1$adapted(AdaptiveSparkPlanHelper.scala:86)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach(AdaptiveSparkPlanHelper.scala:45)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.foreach$(AdaptiveSparkPlanHelper.scala:44)
at
org.apache.spark.sql.execution.CollectMetricsExec$.foreach(CollectMetricsExec.scala:101)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect(AdaptiveSparkPlanHelper.scala:86)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collect$(AdaptiveSparkPlanHelper.scala:83)
at
org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:101)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.$anonfun$collectWithSubqueries$1(AdaptiveSparkPlanHelper.scala:113)
at scala.collection.immutable.List.flatMap(List.scala:294)
at scala.collection.immutable.List.flatMap(List.scala:79)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries(AdaptiveSparkPlanHelper.scala:113)
at
org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper.collectWithSubqueries$(AdaptiveSparkPlanHelper.scala:112)
at
org.apache.spark.sql.execution.CollectMetricsExec$.collectWithSubqueries(CollectMetricsExec.scala:101)
at
org.apache.spark.sql.execution.CollectMetricsExec$.collect(CollectMetricsExec.scala:109)
at
org.apache.spark.sql.execution.QueryExecution.observedMetrics(QueryExecution.scala:276)
at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.createObservedMetricsResponse(SparkConnectPlanExecution.scala:322)
at
org.apache.spark.sql.connect.execution.SparkConnectPlanExecution.handlePlan(SparkConnectPlanExecution.scala:82)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1(ExecuteThreadRunner.scala:224)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.$anonfun$executeInternal$1$adapted(ExecuteThreadRunner.scala:196)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$2(SessionHolder.scala:394)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:804)
at
org.apache.spark.sql.connect.service.SessionHolder.$anonfun$withSession$1(SessionHolder.scala:394)
at
org.apache.spark.JobArtifactSet$.withActiveJobArtifactState(JobArtifactSet.scala:94)
at
org.apache.spark.sql.artifact.ArtifactManager.$anonfun$withResources$1(ArtifactManager.scala:113)
at org.apache.spark.util.Utils$.withContextClassLoader(Utils.scala:184)
at
org.apache.spark.sql.artifact.ArtifactManager.withClassLoaderIfNeeded(ArtifactManager.scala:103)
at
org.apache.spark.sql.artifact.ArtifactManager.withResources(ArtifactManager.scala:112)
at
org.apache.spark.sql.connect.service.SessionHolder.withSession(SessionHolder.scala:393)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.executeInternal(ExecuteThreadRunner.scala:196)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner.org$apache$spark$sql$connect$execution$ExecuteThreadRunner$$execute(ExecuteThreadRunner.scala:125)
at
org.apache.spark.sql.connect.execution.ExecuteThreadRunner$ExecutionThread.run(ExecuteThreadRunner.scala:333)
```
This test failure can be reproduced by inserting sleep into
`ArrayBasedMapBuilder.scala` like as follows.
```
private def reset(): Unit = {
keyToIndex.clear()
keys.clear()
+ Thread.sleep(10)
values.clear()
}
```
And then, run the test as follows.
```
$ python/run-tests --modules=pyspark-connect --parallelism=1 --testnames
pyspark.sql.tests.connect.test_parity_observation
--python-executables=python3.11
```
To fix this issue, this PR proposes to change
`QueryExecution#observedMetrics` within a synchronized block.
### Why are the changes needed?
Bug fix.
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
Ran the problematic test with inserting sleep like as mentioned above, and
confirmed the test passed.
### Was this patch authored or co-authored using generative AI tooling?
No.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]