Stephan Ewen created FLINK-9751:
-----------------------------------

             Summary: Add a RecoverableWriter to the FileSystem abstraction
                 Key: FLINK-9751
                 URL: https://issues.apache.org/jira/browse/FLINK-9751
             Project: Flink
          Issue Type: Sub-task
    Affects Versions: 1.6.0
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen


The core operation of the StreamingFileSink is to append result data to 
(hidden) "in progress" files and then, when the files should roll over, publish 
them as visible files. At each checkpoint, the data so far must be persistent 
in the "in progress" files. On recovery, we resume the "in progress" file at 
the exact position of the checkpoint, or publish up to the position of that 
checkpoint.

In order to support various file systems and object stores, we need an 
interface that captures these core operations and allows for different 
implementations (such as file truncate/append on posix, MultiPartUpload on S3, 
...)

Proposed interface:
{code:java}
/**
 * A handle to an in-progress stream with a defined and persistent amount of 
data.
 * The handle can be used to recover the stream and publish the result file.
 */
interface CommitRecoverable { ... }

/**
 * A handle to an in-progress stream with a defined and persistent amount of 
data.
 * The handle can be used to recover the stream and either publish the result 
file
 * or keep appending data to the stream.
 */
interface ResumeRecoverable extends CommitRecoverable { ... }

/**
 * An output stream to a file system that can be recovered at well defined 
points.
 * The stream initially writes to hidden files or temp files and only creates 
the
 * target file once it is closed and "committed".
 */
public abstract class RecoverableFsDataOutputStream extends FSDataOutputStream {

    /**
     * Ensures all data so far is persistent (similar to {@link #sync()}) and 
returns
     * a handle to recover the stream at the current position.
     */
    public abstract ResumeRecoverable persist() throws IOException;

    /**
     * Closes the stream, ensuring persistence of all data (similar to {@link 
#sync()}).
     * This returns a Committer that can be used to publish (make visible) the 
file
     * that the stream was writing to.
     */
    public abstract Committer closeForCommit() throws IOException;

    /**
     * A committer can publish the file of a stream that was closed.
     * The Committer can be recovered via a {@link CommitRecoverable}.
     */
    public interface Committer {

        void commit() throws IOException;

        CommitRecoverable getRecoverable();
    }
}

/**
 * The RecoverableWriter creates and recovers RecoverableFsDataOutputStream.
 */
public interface RecoverableWriter{

    RecoverableFsDataOutputStream open(Path path) throws IOException;

    RecoverableFsDataOutputStream recover(ResumeRecoverable resumable) throws 
IOException;

    RecoverableFsDataOutputStream.Committer recoverForCommit(CommitRecoverable 
resumable) throws IOException;
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to