anishshri-db commented on code in PR #50773: URL: https://github.com/apache/spark/pull/50773#discussion_r2072007801
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala: ########## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileNotFoundException, InputStream} +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit +import java.util.zip.{CheckedInputStream, CheckedOutputStream, CRC32C} + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.io.Source + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} +import org.apache.hadoop.fs._ +import org.json4s.{Formats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.SparkException +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{CHECKSUM, NUM_BYTES, PATH, TIMEOUT} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream +import org.apache.spark.util.ThreadUtils + +/** Information about the creator of the checksum file. Useful for debugging */ +case class ChecksumFileCreatorInfo( + executorId: String, + taskInfo: String) + +object ChecksumFileCreatorInfo { + def apply(): ChecksumFileCreatorInfo = { + val executorId = Option(SparkEnv.get).map(_.executorId).getOrElse("") + val taskInfo = Option(TaskContext.get()).map(tc => + s"Task= ${tc.partitionId()}.${tc.attemptNumber()}, " + + s"Stage= ${tc.stageId()}.${tc.stageAttemptNumber()}").getOrElse("") + new ChecksumFileCreatorInfo(executorId, taskInfo) + } +} + +/** This is the content of the checksum file. + * Holds the checksum value and additional information */ +case class Checksum( + algorithm: String, + // Making this a string to be agnostic of algorithm used and be easily readable. + // We can change this to byte array later if we start using algos with large values. + value: String, + mainFileSize: Long, + timestampMs: Long, + creator: ChecksumFileCreatorInfo) { + + import Checksum._ + + def json(): String = { + mapper.writeValueAsString(this) + } +} + +object Checksum { + implicit val format: Formats = Serialization.formats(NoTypeHints) + + /** Used to convert between class and JSON. */ + lazy val mapper = { + val _mapper = new ObjectMapper with ClassTagExtensions + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } + + def fromJson(json: String): Checksum = { + Serialization.read[Checksum](json) + } +} + +/** Holds the path of the checksum file and knows the main file path. */ +case class ChecksumFile(path: Path) { + import ChecksumCheckpointFileManager._ + assert(isChecksumFile(path), "path is not a checksum file") + + val mainFilePath = new Path(path.toString.stripSuffix(CHECKSUM_FILE_SUFFIX)) + + /** The name of the file without any extensions e.g. my-file.txt.crc returns my-file */ + val baseName: String = path.getName.split("\\.").head +} + +/** + * A [[CheckpointFileManager]] that creates a checksum file for the main file. + * This wraps another [[CheckpointFileManager]] and adds checksum functionality on top of it. + * Under the hood, when a file is created, it also creates a checksum file with the same name as + * the main file but adds a suffix. It returns [[ChecksumCancellableFSDataOutputStream]] + * which handles the writing of the main file and checksum file. + * + * When a file is opened, it returns [[ChecksumFSDataInputStream]], which handles reading + * the main file and checksum file and does the checksum verification. + * + * In order to reduce the impact of reading/writing 2 files instead of 1, it uses a threadpool + * to read/write both files concurrently. + * + * @note + * It is able to read files written by other [[CheckpointFileManager]], that don't have checksum. + * It automatically deletes the checksum file when the main file is deleted. + * If you delete the main file with a different type of manager, then the checksum file will be + * left behind (i.e. orphan checksum file), since they don't know about it. It would be your + * responsibility to delete the orphan checksum files. + * + * @param underlyingFileMgr The file manager to use under the hood + * @param allowConcurrentDelete If true, allows deleting the main and checksum file concurrently. + * This is a perf optimization, but can potentially lead to + * orphan checksum files. If using this, it is your responsibility + * to clean up the potential orphan checksum files. + * @param numThreads This is the number of threads to use for the thread pool, for reading/writing + * files. To avoid blocking, if the file manager instance is being used by a + * single thread, then you can set this to 2 (one thread for main file, another + * for checksum file). + * If file manager is shared by multiple threads, you can set it to + * number of threads using file manager * 2. + * Setting this differently can lead to file operation being blocked waiting for + * a free thread. + */ +class ChecksumCheckpointFileManager( + private val underlyingFileMgr: CheckpointFileManager, + val allowConcurrentDelete: Boolean = false, + val numThreads: Int) + extends CheckpointFileManager with Logging { + assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 for the main file" + + "and another for the checksum file") + + import ChecksumCheckpointFileManager._ + + // This allows us to concurrently read/write the main file and checksum file + private val threadPool = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread")) + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = { + underlyingFileMgr.list(path, filter) + } + + override def mkdirs(path: Path): Unit = { + underlyingFileMgr.mkdirs(path) + } + + override def createAtomic(path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + assert(!isChecksumFile(path), "Cannot directly create a checksum file") + + val mainFileFuture = Future { + underlyingFileMgr.createAtomic(path, overwriteIfPossible) + }(threadPool) + + val checksumFileFuture = Future { + underlyingFileMgr.createAtomic(getChecksumPath(path), overwriteIfPossible) + }(threadPool) + + new ChecksumCancellableFSDataOutputStream( + awaitResult(mainFileFuture, Duration.Inf), + path, + awaitResult(checksumFileFuture, Duration.Inf), + threadPool + ) + } + + override def open(path: Path): FSDataInputStream = { + assert(!isChecksumFile(path), "Cannot directly open a checksum file") + + val checksumInputStreamFuture = Future { + try { + Some(underlyingFileMgr.open(getChecksumPath(path))) + } catch { + // In case the client previously had file checksum disabled. + // Then previously created files won't have checksum. + case _: FileNotFoundException => + logWarning(log"No checksum file found for ${MDC(PATH, path)}, " + + log"hence no checksum verification.") + None + } + }(threadPool) + + val mainInputStreamFuture = Future { + underlyingFileMgr.open(path) + }(threadPool) + + val mainStream = awaitResult(mainInputStreamFuture, Duration.Inf) + val checksumStream = awaitResult(checksumInputStreamFuture, Duration.Inf) + + checksumStream.map { chkStream => + new ChecksumFSDataInputStream(mainStream, path, chkStream, threadPool) + }.getOrElse(mainStream) + } + + override def exists(path: Path): Boolean = underlyingFileMgr.exists(path) + + override def delete(path: Path): Unit = { + // Allowing directly deleting the checksum file for orphan checksum file scenario + if (isChecksumFile(path)) { + deleteChecksumFile(path) + } else if (allowConcurrentDelete) { + // Ideally, we should first try to delete the checksum file + // before the main file, to avoid a situation where the main file is deleted but the + // checksum file deletion failed. The client might not call delete again if the main file + // no longer exists. + // But if allowConcurrentDelete is enabled, then we can do it concurrently for perf. + // But the client would be responsible for cleaning up potential orphan checksum files + // if it happens. + val checksumInputStreamFuture = Future { + deleteChecksumFile(getChecksumPath(path)) + }(threadPool) + + val mainInputStreamFuture = Future { + underlyingFileMgr.delete(path) + }(threadPool) + + awaitResult(mainInputStreamFuture, Duration.Inf) + awaitResult(checksumInputStreamFuture, Duration.Inf) + } else { + // First delete the checksum file, then main file + deleteChecksumFile(getChecksumPath(path)) + underlyingFileMgr.delete(path) + } + } + + private def deleteChecksumFile(checksumPath: Path): Unit = { + try { + underlyingFileMgr.delete(checksumPath) + logDebug(log"Deleted checksum file ${MDC(PATH, checksumPath)}") + } catch { + case _: FileNotFoundException => + // Ignore if file has already been deleted + // or the main file was created initially without checksum + logWarning(log"Skipping deletion of checksum file ${MDC(PATH, checksumPath)} " + + log"since it does not exist.") + } + } + + override def isLocal: Boolean = underlyingFileMgr.isLocal + + override def createCheckpointDirectory(): Path = { + underlyingFileMgr.createCheckpointDirectory() + } + + override def close(): Unit = { + threadPool.shutdown() + // Wait a bit for it to finish up in case there is any ongoing work + // Can consider making this timeout configurable, if needed + val timeoutMs = 100 + if (!threadPool.awaitTermination(timeoutMs, TimeUnit.MILLISECONDS)) { + logWarning(log"Thread pool did not shutdown after ${MDC(TIMEOUT, timeoutMs)} ms") + } + } +} + +private[streaming] object ChecksumCheckpointFileManager { + val CHECKSUM_FILE_SUFFIX = ".crc" + + def awaitResult[T](future: Future[T], atMost: Duration): T = { + try { + ThreadUtils.awaitResult(future, atMost) + } catch { + // awaitResult wraps the exception. Unwrap it, and throw the actual error + case e: SparkException if e.getMessage.contains("Exception thrown in awaitResult") => + throw e.getCause + } + } + + def getChecksumPath(mainFilePath: Path): Path = { + new Path(mainFilePath.toString + CHECKSUM_FILE_SUFFIX) + } + + def isChecksumFile(path: Path): Boolean = { + path.getName.endsWith(CHECKSUM_FILE_SUFFIX) + } +} + +/** An implementation of [[FSDataInputStream]] that calculates the checksum of the file + * that the client is reading (main file) incrementally, while it is being read. + * It then does checksum verification on close, to verify that the computed checksum + * matches the expected checksum in the checksum file. + * + * Computing the checksum incrementally and doing the verification after file read is complete + * is for better performance, instead of first reading the entire file and doing verification + * before the client starts reading the file. + * + * @param mainStream Input stream for the main file the client wants to read + * @param path The path of the main file + * @param expectedChecksumStream The input stream for the checksum file + * @param threadPool Thread pool to use for concurrently operating on the main and checksum file + * */ +class ChecksumFSDataInputStream( + private val mainStream: FSDataInputStream, + path: Path, + private val expectedChecksumStream: FSDataInputStream, + private val threadPool: ExecutionContext) + extends FSDataInputStream(new CheckedSequentialInputStream(mainStream)) with Logging { + + import ChecksumCheckpointFileManager._ + + @volatile private var verified = false + @volatile private var closed = false + + override def close(): Unit = { + if (!closed) { + // We verify the checksum only when the client is done reading. + verifyChecksum() + closeInternal() + } + } + + /** This is used to skip checksum verification on close. + * Avoid using this, and it is only used for a situation where the file is opened, read, + * then closed multiple times, and we want to avoid doing verification each time + * and only want to do it once. + * */ + def closeWithoutChecksumVerification(): Unit = { + if (!closed) { + // Ideally this should be warning, but if a file is doing this frequently + // it will cause unnecessary noise in the logs. This can be changed later. + logDebug(log"Closing file ${MDC(PATH, path)} without checksum verification") + closeInternal() + } + } + + private def closeInternal(): Unit = { + closed = true + + val mainCloseFuture = Future { + super.close() // close the main file + }(threadPool) + + val checksumCloseFuture = Future { + expectedChecksumStream.close() // close the checksum file + }(threadPool) + + awaitResult(mainCloseFuture, Duration.Inf) + awaitResult(checksumCloseFuture, Duration.Inf) + } + + private def verifyChecksum(): Unit = { + if (!verified) { + // It is possible the file was not read till the end by the reader, + // but we need the entire file content for checksum verification. + // Hence, we will read the file till the end from where reader stopped. + var remainingBytesRead = 0 + val buffer = new Array[Byte](1024) + var bytesRead = 0 + while ({ bytesRead = super.read(buffer); bytesRead != -1 }) { + remainingBytesRead += bytesRead + } + + // we are at the end position so that tells us the size + val computedFileSize = mainStream.getPos + + if (remainingBytesRead > 0) { + // Making this debug log since most of our files are not read exactly to the end + // and don't want to cause unnecessary noise in the logs. + logDebug(log"File ${MDC(PATH, path)} was not read till the end by reader. " + + log"Finished reading the rest of the file for checksum verification. " + + log"Remaining bytes read: ${MDC(NUM_BYTES, remainingBytesRead)}, " + + log"total size: ${MDC(NUM_BYTES, computedFileSize)}.") + } + + // Read the expected checksum from the checksum file + val expectedChecksumJson = Source.fromInputStream( + expectedChecksumStream, StandardCharsets.UTF_8.name()).mkString + val expectedChecksum = Checksum.fromJson(expectedChecksumJson) + // Get what we computed while the main file was being read locally + // `in` is the CheckedSequentialInputStream we created + val computedChecksumValue = in.asInstanceOf[CheckedInputStream].getChecksum.getValue.toInt + + logInfo(log"Verifying checksum for file ${MDC(PATH, path)}, " + + log"remainingBytesRead= ${MDC(NUM_BYTES, remainingBytesRead)}. " + + log"Computed(checksum= ${MDC(CHECKSUM, computedChecksumValue)}, " + + log"fileSize= ${MDC(NUM_BYTES, computedFileSize)})." + + log"Checksum file content: ${MDC(CHECKSUM, expectedChecksumJson)}") + + verified = true + + // Compare file size too, in case of collision + if (expectedChecksum.value.toInt != computedChecksumValue || + expectedChecksum.mainFileSize != computedFileSize) { + throw QueryExecutionErrors.checkpointFileChecksumVerificationFailed( Review Comment: Can we move this to `StateStoreErrors.scala` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org