anishshri-db commented on code in PR #50773: URL: https://github.com/apache/spark/pull/50773#discussion_r2072000866
########## sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ChecksumCheckpointFileManager.scala: ########## @@ -0,0 +1,513 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import java.io.{FileNotFoundException, InputStream} +import java.nio.charset.StandardCharsets +import java.util.concurrent.TimeUnit +import java.util.zip.{CheckedInputStream, CheckedOutputStream, CRC32C} + +import scala.concurrent.{ExecutionContext, Future} +import scala.concurrent.duration.Duration +import scala.io.Source + +import com.fasterxml.jackson.annotation.JsonInclude.Include +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} +import org.apache.hadoop.fs._ +import org.json4s.{Formats, NoTypeHints} +import org.json4s.jackson.Serialization + +import org.apache.spark.{SparkEnv, TaskContext} +import org.apache.spark.SparkException +import org.apache.spark.internal.{Logging, MDC} +import org.apache.spark.internal.LogKeys.{CHECKSUM, NUM_BYTES, PATH, TIMEOUT} +import org.apache.spark.sql.errors.QueryExecutionErrors +import org.apache.spark.sql.execution.streaming.CheckpointFileManager.CancellableFSDataOutputStream +import org.apache.spark.util.ThreadUtils + +/** Information about the creator of the checksum file. Useful for debugging */ +case class ChecksumFileCreatorInfo( + executorId: String, + taskInfo: String) + +object ChecksumFileCreatorInfo { + def apply(): ChecksumFileCreatorInfo = { + val executorId = Option(SparkEnv.get).map(_.executorId).getOrElse("") + val taskInfo = Option(TaskContext.get()).map(tc => + s"Task= ${tc.partitionId()}.${tc.attemptNumber()}, " + + s"Stage= ${tc.stageId()}.${tc.stageAttemptNumber()}").getOrElse("") + new ChecksumFileCreatorInfo(executorId, taskInfo) + } +} + +/** This is the content of the checksum file. + * Holds the checksum value and additional information */ +case class Checksum( + algorithm: String, + // Making this a string to be agnostic of algorithm used and be easily readable. + // We can change this to byte array later if we start using algos with large values. + value: String, + mainFileSize: Long, + timestampMs: Long, + creator: ChecksumFileCreatorInfo) { + + import Checksum._ + + def json(): String = { + mapper.writeValueAsString(this) + } +} + +object Checksum { + implicit val format: Formats = Serialization.formats(NoTypeHints) + + /** Used to convert between class and JSON. */ + lazy val mapper = { + val _mapper = new ObjectMapper with ClassTagExtensions + _mapper.setSerializationInclusion(Include.NON_ABSENT) + _mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + _mapper.registerModule(DefaultScalaModule) + _mapper + } + + def fromJson(json: String): Checksum = { + Serialization.read[Checksum](json) + } +} + +/** Holds the path of the checksum file and knows the main file path. */ +case class ChecksumFile(path: Path) { + import ChecksumCheckpointFileManager._ + assert(isChecksumFile(path), "path is not a checksum file") + + val mainFilePath = new Path(path.toString.stripSuffix(CHECKSUM_FILE_SUFFIX)) + + /** The name of the file without any extensions e.g. my-file.txt.crc returns my-file */ + val baseName: String = path.getName.split("\\.").head +} + +/** + * A [[CheckpointFileManager]] that creates a checksum file for the main file. + * This wraps another [[CheckpointFileManager]] and adds checksum functionality on top of it. + * Under the hood, when a file is created, it also creates a checksum file with the same name as + * the main file but adds a suffix. It returns [[ChecksumCancellableFSDataOutputStream]] + * which handles the writing of the main file and checksum file. + * + * When a file is opened, it returns [[ChecksumFSDataInputStream]], which handles reading + * the main file and checksum file and does the checksum verification. + * + * In order to reduce the impact of reading/writing 2 files instead of 1, it uses a threadpool + * to read/write both files concurrently. + * + * @note + * It is able to read files written by other [[CheckpointFileManager]], that don't have checksum. + * It automatically deletes the checksum file when the main file is deleted. + * If you delete the main file with a different type of manager, then the checksum file will be + * left behind (i.e. orphan checksum file), since they don't know about it. It would be your + * responsibility to delete the orphan checksum files. + * + * @param underlyingFileMgr The file manager to use under the hood + * @param allowConcurrentDelete If true, allows deleting the main and checksum file concurrently. + * This is a perf optimization, but can potentially lead to + * orphan checksum files. If using this, it is your responsibility + * to clean up the potential orphan checksum files. + * @param numThreads This is the number of threads to use for the thread pool, for reading/writing + * files. To avoid blocking, if the file manager instance is being used by a + * single thread, then you can set this to 2 (one thread for main file, another + * for checksum file). + * If file manager is shared by multiple threads, you can set it to + * number of threads using file manager * 2. + * Setting this differently can lead to file operation being blocked waiting for + * a free thread. + */ +class ChecksumCheckpointFileManager( + private val underlyingFileMgr: CheckpointFileManager, + val allowConcurrentDelete: Boolean = false, + val numThreads: Int) + extends CheckpointFileManager with Logging { + assert(numThreads % 2 == 0, "numThreads must be a multiple of 2, we need 1 for the main file" + + "and another for the checksum file") + + import ChecksumCheckpointFileManager._ + + // This allows us to concurrently read/write the main file and checksum file + private val threadPool = ExecutionContext.fromExecutorService( + ThreadUtils.newDaemonFixedThreadPool(numThreads, s"${this.getClass.getSimpleName}-Thread")) + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = { + underlyingFileMgr.list(path, filter) + } + + override def mkdirs(path: Path): Unit = { + underlyingFileMgr.mkdirs(path) + } + + override def createAtomic(path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + assert(!isChecksumFile(path), "Cannot directly create a checksum file") + + val mainFileFuture = Future { + underlyingFileMgr.createAtomic(path, overwriteIfPossible) + }(threadPool) + + val checksumFileFuture = Future { + underlyingFileMgr.createAtomic(getChecksumPath(path), overwriteIfPossible) + }(threadPool) + + new ChecksumCancellableFSDataOutputStream( + awaitResult(mainFileFuture, Duration.Inf), + path, + awaitResult(checksumFileFuture, Duration.Inf), + threadPool + ) + } + + override def open(path: Path): FSDataInputStream = { + assert(!isChecksumFile(path), "Cannot directly open a checksum file") + + val checksumInputStreamFuture = Future { + try { + Some(underlyingFileMgr.open(getChecksumPath(path))) + } catch { + // In case the client previously had file checksum disabled. + // Then previously created files won't have checksum. + case _: FileNotFoundException => + logWarning(log"No checksum file found for ${MDC(PATH, path)}, " + + log"hence no checksum verification.") + None + } + }(threadPool) + + val mainInputStreamFuture = Future { + underlyingFileMgr.open(path) + }(threadPool) + + val mainStream = awaitResult(mainInputStreamFuture, Duration.Inf) + val checksumStream = awaitResult(checksumInputStreamFuture, Duration.Inf) + + checksumStream.map { chkStream => + new ChecksumFSDataInputStream(mainStream, path, chkStream, threadPool) + }.getOrElse(mainStream) + } + + override def exists(path: Path): Boolean = underlyingFileMgr.exists(path) + + override def delete(path: Path): Unit = { + // Allowing directly deleting the checksum file for orphan checksum file scenario + if (isChecksumFile(path)) { + deleteChecksumFile(path) + } else if (allowConcurrentDelete) { + // Ideally, we should first try to delete the checksum file + // before the main file, to avoid a situation where the main file is deleted but the + // checksum file deletion failed. The client might not call delete again if the main file + // no longer exists. + // But if allowConcurrentDelete is enabled, then we can do it concurrently for perf. + // But the client would be responsible for cleaning up potential orphan checksum files + // if it happens. + val checksumInputStreamFuture = Future { + deleteChecksumFile(getChecksumPath(path)) + }(threadPool) + + val mainInputStreamFuture = Future { + underlyingFileMgr.delete(path) + }(threadPool) + + awaitResult(mainInputStreamFuture, Duration.Inf) + awaitResult(checksumInputStreamFuture, Duration.Inf) + } else { + // First delete the checksum file, then main file + deleteChecksumFile(getChecksumPath(path)) + underlyingFileMgr.delete(path) + } + } + + private def deleteChecksumFile(checksumPath: Path): Unit = { + try { + underlyingFileMgr.delete(checksumPath) + logDebug(log"Deleted checksum file ${MDC(PATH, checksumPath)}") + } catch { + case _: FileNotFoundException => + // Ignore if file has already been deleted + // or the main file was created initially without checksum + logWarning(log"Skipping deletion of checksum file ${MDC(PATH, checksumPath)} " + Review Comment: Will this also apply to existing checkpoints also that dont have checksum verification enabled ? should we log only if the feature is enabled ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org