[ https://issues.apache.org/jira/browse/FLINK-9751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Stephan Ewen updated FLINK-9751: -------------------------------- Component/s: Streaming Connectors > Add a RecoverableWriter to the FileSystem abstraction > ----------------------------------------------------- > > Key: FLINK-9751 > URL: https://issues.apache.org/jira/browse/FLINK-9751 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors > Reporter: Stephan Ewen > Assignee: Stephan Ewen > Priority: Major > Fix For: 1.6.0 > > > The core operation of the StreamingFileSink is to append result data to > (hidden) "in progress" files and then, when the files should roll over, > publish them as visible files. At each checkpoint, the data so far must be > persistent in the "in progress" files. On recovery, we resume the "in > progress" file at the exact position of the checkpoint, or publish up to the > position of that checkpoint. > In order to support various file systems and object stores, we need an > interface that captures these core operations and allows for different > implementations (such as file truncate/append on posix, MultiPartUpload on > S3, ...) > Proposed interface: > {code:java} > /** > * A handle to an in-progress stream with a defined and persistent amount of > data. > * The handle can be used to recover the stream and publish the result file. > */ > interface CommitRecoverable { ... } > /** > * A handle to an in-progress stream with a defined and persistent amount of > data. > * The handle can be used to recover the stream and either publish the result > file > * or keep appending data to the stream. > */ > interface ResumeRecoverable extends CommitRecoverable { ... } > /** > * An output stream to a file system that can be recovered at well defined > points. > * The stream initially writes to hidden files or temp files and only creates > the > * target file once it is closed and "committed". > */ > public abstract class RecoverableFsDataOutputStream extends > FSDataOutputStream { > /** > * Ensures all data so far is persistent (similar to {@link #sync()}) and > returns > * a handle to recover the stream at the current position. > */ > public abstract ResumeRecoverable persist() throws IOException; > /** > * Closes the stream, ensuring persistence of all data (similar to {@link > #sync()}). > * This returns a Committer that can be used to publish (make visible) > the file > * that the stream was writing to. > */ > public abstract Committer closeForCommit() throws IOException; > /** > * A committer can publish the file of a stream that was closed. > * The Committer can be recovered via a {@link CommitRecoverable}. > */ > public interface Committer { > void commit() throws IOException; > CommitRecoverable getRecoverable(); > } > } > /** > * The RecoverableWriter creates and recovers RecoverableFsDataOutputStream. > */ > public interface RecoverableWriter{ > RecoverableFsDataOutputStream open(Path path) throws IOException; > RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws > IOException; > RecoverableFsDataOutputStream.Committer > recoverForCommit(CommitRecoverable resumable) throws IOException; > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)