vrozov opened a new pull request, #50594:
URL: https://github.com/apache/spark/pull/50594

   ### What changes were proposed in this pull request?
   Do not hold `uninterruptibleLock` monitor while calling `super.interrupt()` 
in `UninterruptibleThread`, instead use newly introduced `awaitInterruptThread` 
flag and wait for `super.interrupt()` to be called.
   
   ### Why are the changes needed?
   There is potential deadlock as `UninterruptibleThread` may be blocked on NIO 
operation and interrupting channel while holding `uninterruptibleLock` monitor 
may cause deadlock like in 
   ```
   Found one Java-level deadlock:
   =============================
   "Executor 566 task launch worker for task 202024534, task 19644.1 in stage 
13967543.0 of app application_1736396393732_100191":
     waiting to lock monitor 0x00007f9435525aa0 (object 0x00007f9575000a70, a 
java.lang.Object),
     which is held by "Task reaper-9"
   
   "Task reaper-9":
     waiting to lock monitor 0x00007fa06b315500 (object 0x00007f963d0af788, a 
org.apache.hadoop.hdfs.DFSOutputStream),
     which is held by "Executor 566 task launch worker for task 202024534, task 
19644.1 in stage 13967543.0 of app application_1736396393732_100191"
   
   Java stack information for the threads listed above:
   ===================================================
   "Executor 566 task launch worker for task 202024534, task 19644.1 in stage 
13967543.0 of app application_1736396393732_100191":
        at 
org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:96)
        - waiting to lock <0x00007f9575000a70> (a java.lang.Object)
        at 
org.apache.hadoop.hdfs.DataStreamer.waitAndQueuePacket(DataStreamer.java:989)
        - locked <0x00007f963d0af760> (a java.util.LinkedList)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.enqueueCurrentPacket(DFSOutputStream.java:496)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.enqueueCurrentPacketFull(DFSOutputStream.java:505)
        - locked <0x00007f963d0af788> (a org.apache.hadoop.hdfs.DFSOutputStream)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.writeChunk(DFSOutputStream.java:445)
        - locked <0x00007f963d0af788> (a org.apache.hadoop.hdfs.DFSOutputStream)
        at 
org.apache.hadoop.fs.FSOutputSummer.writeChecksumChunks(FSOutputSummer.java:218)
        at 
org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:165)
        - eliminated <0x00007f963d0af788> (a 
org.apache.hadoop.hdfs.DFSOutputStream)
        at 
org.apache.hadoop.fs.FSOutputSummer.flushBuffer(FSOutputSummer.java:146)
        - locked <0x00007f963d0af788> (a org.apache.hadoop.hdfs.DFSOutputStream)
        at org.apache.hadoop.fs.FSOutputSummer.write1(FSOutputSummer.java:137)
        at org.apache.hadoop.fs.FSOutputSummer.write(FSOutputSummer.java:112)
        - locked <0x00007f963d0af788> (a org.apache.hadoop.hdfs.DFSOutputStream)
        at 
org.apache.hadoop.fs.FSDataOutputStream$PositionCache.write(FSDataOutputStream.java:62)
        at 
java.io.DataOutputStream.write(java.base@17.0.6/DataOutputStream.java:112)
        - locked <0x00007f963d0b0a70> (a 
org.apache.hadoop.hdfs.client.HdfsDataOutputStream)
        at 
org.apache.parquet.hadoop.util.HadoopPositionOutputStream.write(HadoopPositionOutputStream.java:50)
        at 
java.nio.channels.Channels$WritableByteChannelImpl.write(java.base@17.0.6/Channels.java:463)
        - locked <0x00007f965dfad498> (a java.lang.Object)
        at 
org.apache.parquet.bytes.ConcatenatingByteBufferCollector.writeAllTo(ConcatenatingByteBufferCollector.java:77)
        at 
org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:1341)
        at 
org.apache.parquet.hadoop.ParquetFileWriter.writeColumnChunk(ParquetFileWriter.java:1262)
        at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore$ColumnChunkPageWriter.writeToFileWriter(ColumnChunkPageWriteStore.java:408)
        at 
org.apache.parquet.hadoop.ColumnChunkPageWriteStore.flushToFileWriter(ColumnChunkPageWriteStore.java:675)
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.flushRowGroupToStore(InternalParquetRecordWriter.java:210)
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.checkBlockSizeReached(InternalParquetRecordWriter.java:178)
        at 
org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:154)
        at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:240)
        at 
org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:41)
        at 
org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:39)
        at 
org.apache.spark.sql.execution.datasources.BaseDynamicPartitionDataWriter.writeRecord(FileFormatDataWriter.scala:357)
        at 
org.apache.spark.sql.execution.datasources.DynamicPartitionDataSingleWriter.write(FileFormatDataWriter.scala:403)
        at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithMetrics(FileFormatDataWriter.scala:86)
        at 
org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:93)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$2(FileFormatWriter.scala:501)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$$Lambda$5526/0x00000008033c9870.apply(Unknown
 Source)
        at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1412)
        at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:508)
        at 
org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$3(WriteFiles.scala:126)
        at 
org.apache.spark.sql.execution.datasources.WriteFilesExec$$Lambda$5678/0x00000008034b9b10.apply(Unknown
 Source)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:931)
        at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:931)
        at 
org.apache.spark.rdd.RDD$$Lambda$1530/0x00000008016c6000.apply(Unknown Source)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:405)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
        at 
org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
        at org.apache.spark.scheduler.Task.run(Task.scala:154)
        at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:622)
        at 
org.apache.spark.executor.Executor$TaskRunner$$Lambda$965/0x0000000801376d80.apply(Unknown
 Source)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
        at 
org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:625)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.6/ThreadPoolExecutor.java:1136)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.6/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.6/Thread.java:833)
   "Task reaper-9":
        at 
org.apache.hadoop.hdfs.DFSOutputStream.flushOrSync(DFSOutputStream.java:636)
        - waiting to lock <0x00007f963d0af788> (a 
org.apache.hadoop.hdfs.DFSOutputStream)
        at 
org.apache.hadoop.hdfs.DFSOutputStream.hflush(DFSOutputStream.java:585)
        at 
org.apache.hadoop.fs.FSDataOutputStream.hflush(FSDataOutputStream.java:136)
        at 
org.apache.parquet.hadoop.util.HadoopPositionOutputStream.close(HadoopPositionOutputStream.java:65)
        at 
java.nio.channels.Channels$WritableByteChannelImpl.implCloseChannel(java.base@17.0.6/Channels.java:475)
        at 
java.nio.channels.spi.AbstractInterruptibleChannel$1.interrupt(java.base@17.0.6/AbstractInterruptibleChannel.java:162)
        - locked <0x00007f965dfb1340> (a java.lang.Object)
        at java.lang.Thread.interrupt(java.base@17.0.6/Thread.java:997)
        - locked <0x00007f9575003ab0> (a java.lang.Object)
        at 
org.apache.spark.util.UninterruptibleThread.interrupt(UninterruptibleThread.scala:99)
        - locked <0x00007f9575000a70> (a java.lang.Object)
        at org.apache.spark.scheduler.Task.kill(Task.scala:263)
        at 
org.apache.spark.executor.Executor$TaskRunner.kill(Executor.scala:495)
        - locked <0x00007f963d1364e0> (a 
org.apache.spark.executor.Executor$TaskRunner)
        at 
org.apache.spark.executor.Executor$TaskReaper.run(Executor.scala:1001)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(java.base@17.0.6/ThreadPoolExecutor.java:1136)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(java.base@17.0.6/ThreadPoolExecutor.java:635)
        at java.lang.Thread.run(java.base@17.0.6/Thread.java:833)
   
   Found 1 deadlock.
   ```
   
   
   ### Does this PR introduce _any_ user-facing change?
   No
   
   ### How was this patch tested?
   TBD
   
   ### Was this patch authored or co-authored using generative AI tooling?
   No
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to