xuanyuanking commented on code in PR #49413: URL: https://github.com/apache/spark/pull/49413#discussion_r1911781043
########## core/src/test/scala/org/apache/spark/JobCancellationSuite.scala: ########## @@ -712,6 +713,140 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft assert(executionOfInterruptibleCounter.get() < numElements) } + Seq(true, false).foreach { interruptible => + + val (hint1, hint2) = if (interruptible) { + (" not", "") + } else { + ("", " not") + } + + val testName = s"SPARK-50768:$hint1 use TaskContext.createResourceUninterruptibly " + + s"would$hint2 cause stream leak on task interruption" + + test(testName) { + import org.apache.spark.JobCancellationSuite._ + withTempDir { dir => + + // `InterruptionSensitiveInputStream` is designed to easily leak the underlying stream Review Comment: It would be great to mention why InterruptionSensitiveInputStream can easily lead to stream leaks and the key conditions, such as creating internal resources during initialize, as highlighted in the PR description. ########## core/src/main/scala/org/apache/spark/TaskContext.scala: ########## @@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable { /** Gets local properties set upstream in the driver. */ private[spark] def getLocalProperties: Properties + + + /** Whether the current task is allowed to interrupt. */ + private[spark] def interruptible(): Boolean + + /** + * Pending the interruption request until the task is able to + * interrupt after creating the resource uninterruptibly. + */ + private[spark] def pendingInterrupt(threadToInterrupt: Option[Thread], reason: String): Unit + + /** + * Creating a closeable resource uninterruptibly. A task is not allowed to interrupt in this + * state until the resource creation finishes. + */ + private[spark] def createResourceUninterruptibly[T <: Closeable](resourceBuilder: => T): T Review Comment: I noticed that the description mentions a follow-up to apply this function. Could you briefly explain how this method is invoked or provide a simple example? ########## core/src/main/scala/org/apache/spark/TaskContext.scala: ########## @@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable { /** Gets local properties set upstream in the driver. */ private[spark] def getLocalProperties: Properties + + + /** Whether the current task is allowed to interrupt. */ + private[spark] def interruptible(): Boolean + + /** + * Pending the interruption request until the task is able to Review Comment: code style nit ########## core/src/main/scala/org/apache/spark/TaskContext.scala: ########## @@ -305,4 +305,20 @@ abstract class TaskContext extends Serializable { /** Gets local properties set upstream in the driver. */ private[spark] def getLocalProperties: Properties + Review Comment: nit: extra line? ########## core/src/test/scala/org/apache/spark/JobCancellationSuite.scala: ########## @@ -712,6 +713,140 @@ class JobCancellationSuite extends SparkFunSuite with Matchers with BeforeAndAft assert(executionOfInterruptibleCounter.get() < numElements) } + Seq(true, false).foreach { interruptible => + + val (hint1, hint2) = if (interruptible) { + (" not", "") + } else { + ("", " not") + } + + val testName = s"SPARK-50768:$hint1 use TaskContext.createResourceUninterruptibly " + + s"would$hint2 cause stream leak on task interruption" + + test(testName) { + import org.apache.spark.JobCancellationSuite._ + withTempDir { dir => + + // `InterruptionSensitiveInputStream` is designed to easily leak the underlying stream + // when task thread interruption happens during its initialization. + class InterruptionSensitiveInputStream(fileHint: String) extends InputStream { + private var underlying: InputStream = _ + + def initialize(): InputStream = { + val in: InputStream = new InputStream { + + open() + + private def dumpFile(typeName: String): Unit = { + var fileOut: FileOutputStream = null + var objOut: ObjectOutputStream = null + try { + val file = new File(dir, s"$typeName.$fileHint") + fileOut = new FileOutputStream(file) + objOut = new ObjectOutputStream(fileOut) + objOut.writeBoolean(true) + objOut.flush() + } finally { + if (fileOut != null) { + fileOut.close() + } + if (objOut != null) { + objOut.close() + } + } + + } + + private def open(): Unit = { + dumpFile("open") + } + + override def close(): Unit = { + dumpFile("close") + } + + override def read(): Int = -1 + } + + // Leave some time for the task to be interrupted during the + // creation of `InterruptionSensitiveInputStream`. + Thread.sleep(5000) Review Comment: How important is this sleep within the task? Could it potentially make the test flaky? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org