Thanks a lot Steve and Jungtaek for your answers. Steve, You explained really well in depth.
I understood that the existing old implementation was not correct for object store like S3. The new implementation will address that. And for better performance we should better choose a Direct Write based checkpoint rather than Rename based (which we can implement using the new CheckpointFilemanager abstraction) My confusion was because of this line in PR: *This is incorrect as rename is not atomic in HDFS FileSystem implementation* I thought the above line meant that existing old implementation is not correct for HDFS file system as well . So wanted to understand if there is something I am missing . The new implementation is for addressing issue of Object Store like S3 and nor HDFS. Thanks again for your explanation, I am sure it will help a lot of other code readers as well . Regards, Chandan On Mon, Oct 1, 2018 at 5:37 PM Steve Loughran <ste...@hortonworks.com> wrote: > > > On 11 Aug 2018, at 17:33, chandan prakash <chandanbaran...@gmail.com> > wrote: > > Hi All, > I was going through this pull request about new CheckpointFileManager > abstraction in structured streaming coming in 2.4 : > https://issues.apache.org/jira/browse/SPARK-23966 > https://github.com/apache/spark/pull/21048 > > I went through the code in detail and found it will indtroduce a very nice > abstraction which is much cleaner and extensible for Direct Writes File > System like S3 (in addition to current HDFS file system). > > *But I am unable to understand, is it really solving some problem in > exsisting State Store code which is currently existing in Spark 2.3 ? * > > *My questions related to above statements in State Store code : * > *PR description*:: "Checkpoint files must be written atomically such > that *no partial files are generated*. > *QUESTION*: When are partial files generated in current code ? I can see > that data is first written to temp-delta file and then renamed to > version.delta file. If something bad happens, the task will fail due to > thrown exception and abort() will be called on store to close and delete > tempDeltaFileStream . I think it is quite clean, what is the case that > partial files might be generated ? > > > I suspect the issue is that as files are written to a "classic" Posix > store, flush/sync operations can result in the intermediate data being > visible to others. Which is why the convention for checkpointing/commit > operations is : write to temp & rename. Which is not what you want for > object stores, especially S3 > > > > *PR description*:: *State Store behavior is incorrect - HDFS FileSystem > implementation does not have atomic rename*" > *QUESTION*: Hdfs filesystem rename operation is atomic, I think above > line takes into account about checking existing file if exists and then > taking appropriate action which together makes the file renaming operation > multi-steps and hence non-atomic. But why this behaviour is incorrect ? > Even if multiple executors try to write to the same version.delta file, > only 1st of them will succeed, the second one will see the file exists and > will delete its temp-delta file. Looks good . > > > HDFS single file and dir rename is atomic; it grabs a lock on the > metadatastore, does the change, unlocks it. If you are doing any FS op > which explicitly renames more than one file in your commit, you lose > atomicity. If there's a check + rename then yes, it's two step, unless you > can use create(path, overwrite=false) to create some lease file where you > know that the creation is exclusive & atomic for HDFS + Posix, generally > not-at-all for the stores, especially S3 which can actually cache the 404 > in its load balancers for a few tens of milliseconds > > For object stores, you are in different world of pain > > S3: nope; O(files+ data) + observable + partial failures. List > inconsistency + caching of negative GET/HEAD to defend against DoS > wasb: no, except for bits of the tree where you enable leases, something > which increases cost of operations. O(files), with the odd pause if some > shard movement has to take place > google GCS: not sure, but it is O(files) > Azure abfs. Not atomic yet As the code says: > > if (isAtomicRenameKey(source.getName())) { > LOG.warn("The atomic rename feature is not supported by the ABFS > scheme; however rename," > +" create and delete operations are atomic if Namespace is > enabled for your Azure Storage account."); > } > > From my reading of the SPARK-23966 PR, it's the object store problem which > is being addressed -both correctness and performance. > > > Anything I am missing here? > Really curious to know which corner cases we are trying to solve by this > new pull request ? > > > > Object stores as the back end. For S3 in particular, where that rename is > O(data) and a direct PUT to the destination gives you that atomic ness. > > > Someone needs to sit down and write that reference implementation. > > Whoever does want to do that, > > - I believe it can all be done with the normal Hadoop FS APIs, simply > knowing that for the store that OutputStream.close() is (a) atomic, (b) > potentially really slow as the remaining data gets uploaded and (c) when it > fails, can mean all your data just got lost. > - I've got the TLA+ spec for the S3 API which they can use as the > foundation for their proofs of correctness > https://issues.apache.org/jira/secure/attachment/12865161/objectstore.pdf > > > -Steve > -- Chandan Prakash