On 11 Aug 2018, at 17:33, chandan prakash <[email protected]<mailto:[email protected]>> wrote:
Hi All, I was going through this pull request about new CheckpointFileManager abstraction in structured streaming coming in 2.4 : https://issues.apache.org/jira/browse/SPARK-23966 https://github.com/apache/spark/pull/21048 I went through the code in detail and found it will indtroduce a very nice abstraction which is much cleaner and extensible for Direct Writes File System like S3 (in addition to current HDFS file system). But I am unable to understand, is it really solving some problem in exsisting State Store code which is currently existing in Spark 2.3 ? My questions related to above statements in State Store code : PR description:: "Checkpoint files must be written atomically such that no partial files are generated. QUESTION: When are partial files generated in current code ? I can see that data is first written to temp-delta file and then renamed to version.delta file. If something bad happens, the task will fail due to thrown exception and abort() will be called on store to close and delete tempDeltaFileStream . I think it is quite clean, what is the case that partial files might be generated ? I suspect the issue is that as files are written to a "classic" Posix store, flush/sync operations can result in the intermediate data being visible to others. Which is why the convention for checkpointing/commit operations is : write to temp & rename. Which is not what you want for object stores, especially S3 PR description:: State Store behavior is incorrect - HDFS FileSystem implementation does not have atomic rename" QUESTION: Hdfs filesystem rename operation is atomic, I think above line takes into account about checking existing file if exists and then taking appropriate action which together makes the file renaming operation multi-steps and hence non-atomic. But why this behaviour is incorrect ? Even if multiple executors try to write to the same version.delta file, only 1st of them will succeed, the second one will see the file exists and will delete its temp-delta file. Looks good . HDFS single file and dir rename is atomic; it grabs a lock on the metadatastore, does the change, unlocks it. If you are doing any FS op which explicitly renames more than one file in your commit, you lose atomicity. If there's a check + rename then yes, it's two step, unless you can use create(path, overwrite=false) to create some lease file where you know that the creation is exclusive & atomic for HDFS + Posix, generally not-at-all for the stores, especially S3 which can actually cache the 404 in its load balancers for a few tens of milliseconds For object stores, you are in different world of pain S3: nope; O(files+ data) + observable + partial failures. List inconsistency + caching of negative GET/HEAD to defend against DoS wasb: no, except for bits of the tree where you enable leases, something which increases cost of operations. O(files), with the odd pause if some shard movement has to take place google GCS: not sure, but it is O(files) Azure abfs. Not atomic yet As the code says: if (isAtomicRenameKey(source.getName())) { LOG.warn("The atomic rename feature is not supported by the ABFS scheme; however rename," +" create and delete operations are atomic if Namespace is enabled for your Azure Storage account."); } >From my reading of the SPARK-23966 PR, it's the object store problem which is >being addressed -both correctness and performance. Anything I am missing here? Really curious to know which corner cases we are trying to solve by this new pull request ? Object stores as the back end. For S3 in particular, where that rename is O(data) and a direct PUT to the destination gives you that atomic ness. Someone needs to sit down and write that reference implementation. Whoever does want to do that, - I believe it can all be done with the normal Hadoop FS APIs, simply knowing that for the store that OutputStream.close() is (a) atomic, (b) potentially really slow as the remaining data gets uploaded and (c) when it fails, can mean all your data just got lost. - I've got the TLA+ spec for the S3 API which they can use as the foundation for their proofs of correctness https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf -Steve
