xuanyuanking commented on code in PR #49413:
URL: https://github.com/apache/spark/pull/49413#discussion_r1911781043


##########
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala:
##########
@@ -712,6 +713,140 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
     assert(executionOfInterruptibleCounter.get() < numElements)
  }
 
+  Seq(true, false).foreach { interruptible =>
+
+    val (hint1, hint2) = if (interruptible) {
+      (" not", "")
+    } else {
+      ("", " not")
+    }
+
+    val testName = s"SPARK-50768:$hint1 use 
TaskContext.createResourceUninterruptibly " +
+      s"would$hint2 cause stream leak on task interruption"
+
+    test(testName) {
+      import org.apache.spark.JobCancellationSuite._
+      withTempDir { dir =>
+
+        // `InterruptionSensitiveInputStream` is designed to easily leak the 
underlying stream

Review Comment:
   It would be great to mention why InterruptionSensitiveInputStream can easily 
lead to stream leaks and the key conditions, such as creating internal 
resources during initialize, as highlighted in the PR description.



##########
core/src/main/scala/org/apache/spark/TaskContext.scala:
##########
@@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable {
 
   /** Gets local properties set upstream in the driver. */
   private[spark] def getLocalProperties: Properties
+
+
+  /** Whether the current task is allowed to interrupt. */
+  private[spark] def interruptible(): Boolean
+
+  /**
+   *  Pending the interruption request until the task is able to
+   *  interrupt after creating the resource uninterruptibly.
+   */
+  private[spark] def pendingInterrupt(threadToInterrupt: Option[Thread], 
reason: String): Unit
+
+  /**
+   * Creating a closeable resource uninterruptibly. A task is not allowed to 
interrupt in this
+   * state until the resource creation finishes.
+   */
+  private[spark] def createResourceUninterruptibly[T <: 
Closeable](resourceBuilder: => T): T

Review Comment:
   I noticed that the description mentions a follow-up to apply this function. 
Could you briefly explain how this method is invoked or provide a simple 
example?



##########
core/src/main/scala/org/apache/spark/TaskContext.scala:
##########
@@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable {
 
   /** Gets local properties set upstream in the driver. */
   private[spark] def getLocalProperties: Properties
+
+
+  /** Whether the current task is allowed to interrupt. */
+  private[spark] def interruptible(): Boolean
+
+  /**
+   *  Pending the interruption request until the task is able to

Review Comment:
   code style nit



##########
core/src/main/scala/org/apache/spark/TaskContext.scala:
##########
@@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable {
 
   /** Gets local properties set upstream in the driver. */
   private[spark] def getLocalProperties: Properties
+

Review Comment:
   nit: extra line?



##########
core/src/test/scala/org/apache/spark/JobCancellationSuite.scala:
##########
@@ -712,6 +713,140 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
     assert(executionOfInterruptibleCounter.get() < numElements)
  }
 
+  Seq(true, false).foreach { interruptible =>
+
+    val (hint1, hint2) = if (interruptible) {
+      (" not", "")
+    } else {
+      ("", " not")
+    }
+
+    val testName = s"SPARK-50768:$hint1 use 
TaskContext.createResourceUninterruptibly " +
+      s"would$hint2 cause stream leak on task interruption"
+
+    test(testName) {
+      import org.apache.spark.JobCancellationSuite._
+      withTempDir { dir =>
+
+        // `InterruptionSensitiveInputStream` is designed to easily leak the 
underlying stream
+        // when task thread interruption happens during its initialization.
+        class InterruptionSensitiveInputStream(fileHint: String) extends 
InputStream {
+          private var underlying: InputStream = _
+
+          def initialize(): InputStream = {
+            val in: InputStream = new InputStream {
+
+              open()
+
+              private def dumpFile(typeName: String): Unit = {
+                var fileOut: FileOutputStream = null
+                var objOut: ObjectOutputStream = null
+                try {
+                  val file = new File(dir, s"$typeName.$fileHint")
+                  fileOut = new FileOutputStream(file)
+                  objOut = new ObjectOutputStream(fileOut)
+                  objOut.writeBoolean(true)
+                  objOut.flush()
+                } finally {
+                  if (fileOut != null) {
+                    fileOut.close()
+                  }
+                  if (objOut != null) {
+                    objOut.close()
+                  }
+                }
+
+              }
+
+              private def open(): Unit = {
+                dumpFile("open")
+              }
+
+              override def close(): Unit = {
+                dumpFile("close")
+              }
+
+              override def read(): Int = -1
+            }
+
+            // Leave some time for the task to be interrupted during the
+            // creation of `InterruptionSensitiveInputStream`.
+            Thread.sleep(5000)

Review Comment:
   How important is this sleep within the task? Could it potentially make the 
test flaky?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to