    @@ -0,0 +1,397 @@
    +package org.apache.flink.streaming.api.functions.sink.filesystem;
    +import org.apache.flink.annotation.VisibleForTesting;
    +import org.apache.flink.api.common.serialization.Writer;
    +import org.apache.flink.api.common.state.ListState;
    +import org.apache.flink.api.common.state.ListStateDescriptor;
    +import org.apache.flink.api.common.state.OperatorStateStore;
    +import org.apache.flink.api.common.typeutils.base.LongSerializer;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.core.fs.FileSystem;
    +import org.apache.flink.core.fs.Path;
    +import org.apache.flink.core.fs.ResumableWriter;
    +import org.apache.flink.core.io.SimpleVersionedSerializer;
    +import org.apache.flink.runtime.state.CheckpointListener;
    +import org.apache.flink.runtime.state.FunctionInitializationContext;
    +import org.apache.flink.runtime.state.FunctionSnapshotContext;
    +import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
    +import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    +import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeCallback;
    +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
    +import org.apache.flink.util.Preconditions;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +import java.io.IOException;
    +import java.util.HashMap;
    +import java.util.Iterator;
    +import java.util.Map;
    + * Sink that emits its input elements to {@link FileSystem} files within 
buckets. This is
    + * integrated with the checkpointing mechanism to provide exactly once 
    + *
    + *
    + * <p>When creating the sink a {@code basePath} must be specified. The 
base directory contains
    + * one directory for every bucket. The bucket directories themselves 
contain several part files,
    + * with at least one for each parallel subtask of the sink which is 
writing data to that bucket.
    + * These part files contain the actual output data.
    + *
    + *
    + * <p>The sink uses a {@link Bucketer} to determine in which bucket 
directory each element should
    + * be written to inside the base directory. The {@code Bucketer} can, for 
example, use time or
    + * a property of the element to determine the bucket directory. The 
default {@code Bucketer} is a
    + * {@link DateTimeBucketer} which will create one new bucket every hour. 
You can specify
    + * a custom {@code Bucketer} using {@link #setBucketer(Bucketer)}.
    + *
    + *
    + * <p>The filenames of the part files contain the part prefix, "part-", 
the parallel subtask index of the sink
    + * and a rolling counter. For example the file {@code "part-1-17"} 
contains the data from
    + * {@code subtask 1} of the sink and is the {@code 17th} bucket created by 
that subtask.
    + * When a part file becomes bigger than the user-specified part size or 
when the part file becomes older
    + * than the user-specified roll over interval the current part file is 
closed, the part counter is increased
    + * and a new part file is created. The batch size defaults to {@code 
384MB}, this can be configured
    + * using {@link #setPartFileSize(long)}. The roll over interval defaults 
to {@code Long.MAX_VALUE} and
    + * this can be configured using {@link #setRolloverInterval(long)}.
    + *
    + *
    + * <p>In some scenarios, the open buckets are required to change based on 
time. In these cases, the sink
    + * needs to determine when a bucket has become inactive, in order to flush 
and close the part file.
    + * To support this there are two configurable settings:
    + * <ol>
    + *     <li>the frequency to check for inactive buckets, configured by 
{@link #setBucketCheckInterval(long)}, and</li>
    + *     <li>the minimum amount of time a bucket has to not receive any data 
before it is considered inactive,
    + *     configured by {@link #setInactivityInterval(long)}.</li>
    + * </ol>
    + * Both of these parameters default to {@code 60, 000 ms}, or {@code 1 
    + *
    + *
    + * <p>Part files can be in one of three states: {@code in-progress}, 
{@code pending} or {@code finished}.
    + * The reason for this is how the sink works together with the 
checkpointing mechanism to provide exactly-once
    + * semantics and fault-tolerance. The part file that is currently being 
written to is {@code in-progress}. Once
    + * a part file is closed for writing it becomes {@code pending}. When a 
checkpoint is successful the currently
    + * pending files will be moved to {@code finished}.
    + *
    + *
    + * <p>If case of a failure, and in order to guarantee exactly-once 
semantics, the sink should roll back to the state it
    + * had when that last successful checkpoint occurred. To this end, when 
restoring, the restored files in {@code pending}
    + * state are transferred into the {@code finished} state while any {@code 
in-progress} files are rolled back, so that
    + * they do not contain data that arrived after the checkpoint from which 
we restore.
    + *
    + * <p><b>NOTE:</b>
    + * <ol>
    + *     <li>
    + *         If checkpointing is not enabled the pending files will never be 
moved to the finished state. In that case,
    + *         the pending suffix/prefix can be set to {@code ""} to make the 
sink work in a non-fault-tolerant way but
    + *         still provide output without prefixes and suffixes.
    + *     </li>
    + *     <li>
    + *         The part files are written using an instance of {@link Writer}. 
By default, a
    + *         {@link StringWriter} is used, which writes the result of {@code 
toString()} for
    + *         every element, separated by newlines. You can configure the 
writer using the
    + *         {@link #setWriter(Writer)}.
    + *     </li>
    + * </ol>
    + *
    + * @param <IN> Type of the elements emitted by this sink
    + */
    +public class StreamingFileSink<IN>
    +           extends RichSinkFunction<IN>
    +           implements CheckpointedFunction, CheckpointListener, 
ProcessingTimeCallback {
    +   private static final long serialVersionUID = 2544039385174378235L;
    +   private static final Logger LOG = 
    +   private static final long DEFAULT_CHECK_INTERVAL = 60L * 1000L;
    +   private static final long DEFAULT_INACTIVITY_INTERVAL = 60L * 1000L;
    +   private static final long DEFAULT_ROLLOVER_INTERVAL = 60L * 1000L;
    +   private static final long DEFAULT_PART_SIZE = 1024L * 1024L * 384L;
    +   private final Path basePath;
    +   private transient ResumableWriter fsWriter;
    +   private transient Clock clock;
    +   private transient ProcessingTimeService processingTimeService;
    +   private Bucketer<IN> bucketer;
    +   private Writer<IN> writer;
    +   private long bucketCheckInterval = DEFAULT_CHECK_INTERVAL;
    +   private long partFileSize = DEFAULT_PART_SIZE;
    +   private long rolloverInterval = DEFAULT_ROLLOVER_INTERVAL;
    +   private long inactivityInterval = DEFAULT_INACTIVITY_INTERVAL;
    +   private transient Map<Path, Bucket<IN>> activeBuckets;
    +   private long initMaxPartCounter = 0L;
    +   private long maxPartCounterUsed = 0L;
    +   private final ListStateDescriptor<byte[]> bucketStateDesc =
    +                   new ListStateDescriptor<>("bucket-states",
    +                                   BytePrimitiveArraySerializer.INSTANCE);
    +   private transient ListState<byte[]> restoredBucketStates;
    +   private final ListStateDescriptor<Long> maxPartCounterStateDesc =
    +                   new ListStateDescriptor<>("max-part-counter",
    +                                   LongSerializer.INSTANCE);
    +   private transient ListState<Long> restoredMaxCounters;
    +   private transient SimpleVersionedSerializer<Bucket.BucketState> 
    +   private final BucketFactory<IN> bucketFactory;
    +   /**
    +    * Creates a new {@code BucketingSink} that writes files to the given 
base directory.
    +    *
    +    *
    +    * <p>This uses a{@link DateTimeBucketer} as {@link Bucketer} and a 
{@link StringWriter} has writer.
    +    * The maximum bucket size is set to 384 MB.
    +    *
    +    * @param basePath The directory to which to write the bucket files.
    +    */
    +   public StreamingFileSink(Path basePath) {
    +           this(basePath, new DefaultBucketFactory<>());
    +   }
    +   @VisibleForTesting
    +   StreamingFileSink(Path basePath, BucketFactory<IN> bucketFactory) {
    +           this.basePath = Preconditions.checkNotNull(basePath);
    +           this.bucketer = new DateTimeBucketer<>();
    +           this.writer = new StringWriter<>();
    +           this.bucketFactory = Preconditions.checkNotNull(bucketFactory);
    +   }
    +   public StreamingFileSink<IN> setWriter(Writer<IN> writer) {
    +           this.writer = Preconditions.checkNotNull(writer);
    +           return this;
    +   }
    +   public StreamingFileSink<IN> setBucketer(Bucketer<IN> bucketer) {
    +           this.bucketer = Preconditions.checkNotNull(bucketer);
    +           return this;
    +   }
    +   public StreamingFileSink<IN> setPartFileSize(long partFileSize) {
    +           this.partFileSize = partFileSize;
    +           return this;
    +   }
    +   public StreamingFileSink<IN>  setBucketCheckInterval(long 
bucketCheckInterval) {
    +           this.bucketCheckInterval = bucketCheckInterval;
    +           return this;
    +   }
    +   public StreamingFileSink<IN> setRolloverInterval(long rolloverInterval) 
    +           this.rolloverInterval = rolloverInterval;
    +           return this;
    +   }
    +   public StreamingFileSink<IN> setInactivityInterval(long 
inactivityInterval) {
    +           this.inactivityInterval = inactivityInterval;
    +           return this;
    +   }
    +   @Override
    +   public void notifyCheckpointComplete(long checkpointId) throws 
Exception {
    +           final Iterator<Map.Entry<Path, Bucket<IN>>> activeBucketIt =
    +                           activeBuckets.entrySet().iterator();
    +           while (activeBucketIt.hasNext()) {
    +                   Bucket<IN> bucket = activeBucketIt.next().getValue();
    +                   bucket.commitUpToCheckpoint(checkpointId);
    +                   if (!bucket.isActive()) {
    +                           // We've dealt with all the pending files and 
the writer for this bucket is not currently open.
    +                           // Therefore this bucket is currently inactive 
and we can remove it from our state.
    +                           activeBucketIt.remove();
    +                   }
    +           }
    +   }
    +   @Override
    +   public void snapshotState(FunctionSnapshotContext context) throws 
Exception {
    +           Preconditions.checkNotNull(restoredBucketStates);
    +           Preconditions.checkNotNull(fsWriter);
    +           Preconditions.checkNotNull(bucketStateSerializer);
    +           restoredBucketStates.clear();
    +           for (Map.Entry<Path, Bucket<IN>> bucketStateEntry : 
activeBuckets.entrySet()) {
    +                   final Bucket<IN> bucket = bucketStateEntry.getValue();
    +                   final Bucket.BucketState bucketState = bucket.snapshot(
    +                                   context.getCheckpointId(),
    +                                   context.getCheckpointTimestamp());
    +           }
    +           restoredMaxCounters.clear();
    +           restoredMaxCounters.add(maxPartCounterUsed);
    +   }
    +   @Override
    +   public void initializeState(FunctionInitializationContext context) 
throws Exception {
    +           initFileSystemWriter();
    +           this.activeBuckets = new HashMap<>();
    +           // Now when restoring, we start fresh. Everything gets 
committed and the state is empty.
    +           // If in the future we want to resume the in-progress files, we 
should make sure that in
    +           // case we receive two states for the same bucket, we merge 
them appropriately. This includes
    +           // keep only one in-progress file and commit the other, and 
commit the pending ones, as they
    +           // were pending for a previous to the last successful 
    +           final OperatorStateStore stateStore = 
    +           restoredBucketStates = stateStore.getListState(bucketStateDesc);
    +           restoredMaxCounters = 
    +           if (context.isRestored()) {
    +                   final int subtaskIndex = 
    +                   LOG.info("Restoring state for the {} (taskIdx={}).", 
getClass().getSimpleName(), subtaskIndex);
    +                   for (long partCounter: restoredMaxCounters.get()) {
    +                           if (partCounter > initMaxPartCounter) {
    +                                   initMaxPartCounter = partCounter;
    +                           }
    +                   }
    +                   final int version = bucketStateSerializer.getVersion();
    I know and I agree, but after a discussion with @StephanEwen, this is a 
limitation of the current SimpleVersionedSerializer. I may have to somehow 
checkpoint also the version.

> Create new StreamingFileSink that works on Flink's FileSystem abstraction
> -------------------------------------------------------------------------
>                 Key: FLINK-9750
>                 URL: https://issues.apache.org/jira/browse/FLINK-9750
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Streaming Connectors
>            Reporter: Stephan Ewen
>            Assignee: Kostas Kloudas
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.6.0
> Using Flink's own file system abstraction means that we can add additional 
> streaming/checkpointing related behavior.
> In addition, the new StreamingFileSink should only rely on internal 
> checkpointed state what files are possibly in progress or need to roll over, 
> never assume enumeration of files in the file system.

