gaoyunhaii commented on a change in pull request #13740:
URL: https://github.com/apache/flink/pull/13740#discussion_r516044747



##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element 
should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileWriterBucket.class);
+
+       private final BucketID bucketId;
+
+       private final Path bucketPath;
+
+       private final String uniqueId;
+
+       private final BucketWriter<IN, BucketID> bucketWriter;
+
+       private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+       private final OutputFileConfig outputFileConfig;
+
+       private final List<InProgressFileWriter.PendingFileRecoverable> 
pendingFiles = new ArrayList<>();
+
+       private long partCounter;
+
+       @Nullable
+       private InProgressFileRecoverable inProgressFileToCleanup;
+
+       @Nullable
+       private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+       /**
+        * Constructor to create a new empty bucket.
+        */
+       private FileWriterBucket(
+                       BucketID bucketId,
+                       Path bucketPath,
+                       String uniqueId,
+                       BucketWriter<IN, BucketID> bucketWriter,
+                       RollingPolicy<IN, BucketID> rollingPolicy,
+                       OutputFileConfig outputFileConfig) {
+               this.bucketId = checkNotNull(bucketId);
+               this.bucketPath = checkNotNull(bucketPath);
+               this.uniqueId = checkNotNull(uniqueId);

Review comment:
       Currently we did have no requirements for set it directly. I move the 
parameter to be initialized inside the `FileWriterBucket`.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element 
should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileWriterBucket.class);
+
+       private final BucketID bucketId;
+
+       private final Path bucketPath;
+
+       private final String uniqueId;
+
+       private final BucketWriter<IN, BucketID> bucketWriter;
+
+       private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+       private final OutputFileConfig outputFileConfig;
+
+       private final List<InProgressFileWriter.PendingFileRecoverable> 
pendingFiles = new ArrayList<>();
+
+       private long partCounter;
+
+       @Nullable
+       private InProgressFileRecoverable inProgressFileToCleanup;
+
+       @Nullable
+       private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+       /**
+        * Constructor to create a new empty bucket.
+        */
+       private FileWriterBucket(
+                       BucketID bucketId,
+                       Path bucketPath,
+                       String uniqueId,
+                       BucketWriter<IN, BucketID> bucketWriter,
+                       RollingPolicy<IN, BucketID> rollingPolicy,
+                       OutputFileConfig outputFileConfig) {
+               this.bucketId = checkNotNull(bucketId);
+               this.bucketPath = checkNotNull(bucketPath);
+               this.uniqueId = checkNotNull(uniqueId);

Review comment:
       Currently we did have no requirements for set it directly. I moved the 
parameter to be initialized inside the `FileWriterBucket`.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriterBucket.java
##########
@@ -0,0 +1,329 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+
+import static 
org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter.InProgressFileRecoverable;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * A bucket is the directory organization of the output of the {@link 
FileSink}.
+ *
+ * <p>For each incoming element in the {@code FileSink}, the user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element 
should be written to.
+ */
+@Internal
+class FileWriterBucket<IN, BucketID> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileWriterBucket.class);
+
+       private final BucketID bucketId;
+
+       private final Path bucketPath;
+
+       private final String uniqueId;
+
+       private final BucketWriter<IN, BucketID> bucketWriter;
+
+       private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+       private final OutputFileConfig outputFileConfig;
+
+       private final List<InProgressFileWriter.PendingFileRecoverable> 
pendingFiles = new ArrayList<>();
+
+       private long partCounter;
+
+       @Nullable
+       private InProgressFileRecoverable inProgressFileToCleanup;
+
+       @Nullable
+       private InProgressFileWriter<IN, BucketID> inProgressPart;
+
+       /**
+        * Constructor to create a new empty bucket.
+        */
+       private FileWriterBucket(
+                       BucketID bucketId,
+                       Path bucketPath,
+                       String uniqueId,
+                       BucketWriter<IN, BucketID> bucketWriter,
+                       RollingPolicy<IN, BucketID> rollingPolicy,
+                       OutputFileConfig outputFileConfig) {
+               this.bucketId = checkNotNull(bucketId);
+               this.bucketPath = checkNotNull(bucketPath);
+               this.uniqueId = checkNotNull(uniqueId);
+               this.bucketWriter = checkNotNull(bucketWriter);
+               this.rollingPolicy = checkNotNull(rollingPolicy);
+               this.outputFileConfig = checkNotNull(outputFileConfig);
+
+               this.partCounter = 0;
+       }
+
+       /**
+        * Constructor to restore a bucket from checkpointed state.
+        */
+       private FileWriterBucket(
+                       String uniqueId,
+                       BucketWriter<IN, BucketID> partFileFactory,
+                       RollingPolicy<IN, BucketID> rollingPolicy,
+                       FileWriterBucketState<BucketID> bucketState,
+                       OutputFileConfig outputFileConfig) throws IOException {
+
+               this(
+                               bucketState.getBucketId(),
+                               bucketState.getBucketPath(),
+                               uniqueId,
+                               partFileFactory,
+                               rollingPolicy,
+                               outputFileConfig);
+
+               restoreInProgressFile(bucketState);
+       }
+
+       private void restoreInProgressFile(FileWriterBucketState<BucketID> 
state) throws IOException {
+               if (!state.hasInProgressFileRecoverable()) {
+                       return;
+               }
+
+               // we try to resume the previous in-progress file
+               InProgressFileWriter.InProgressFileRecoverable 
inProgressFileRecoverable =
+                               state.getInProgressFileRecoverable();
+
+               if (bucketWriter.getProperties().supportsResume()) {
+                       inProgressPart = bucketWriter.resumeInProgressFileFrom(
+                                       bucketId, inProgressFileRecoverable, 
state.getInProgressFileCreationTime());
+               } else {
+                       pendingFiles.add(inProgressFileRecoverable);
+               }
+       }
+
+       public BucketID getBucketId() {
+               return bucketId;
+       }
+
+       public Path getBucketPath() {
+               return bucketPath;
+       }
+
+       public long getPartCounter() {
+               return partCounter;
+       }
+
+       public boolean isActive() {
+               return inProgressPart != null || inProgressFileToCleanup != 
null || pendingFiles.size() > 0;
+       }
+
+       void merge(final FileWriterBucket<IN, BucketID> bucket) throws 
IOException {
+               checkNotNull(bucket);
+               checkState(Objects.equals(bucket.bucketPath, bucketPath));
+
+               bucket.closePartFile();
+               pendingFiles.addAll(bucket.pendingFiles);
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Merging buckets for bucket id={}", bucketId);
+               }
+       }
+
+       void write(IN element) throws IOException {
+               long now = System.currentTimeMillis();
+               if (inProgressPart == null || 
rollingPolicy.shouldRollOnEvent(inProgressPart, element)) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug(
+                                               "closing in-progress part file 
for bucket id={} due to element {}.",
+                                               bucketId,
+                                               element);
+                       }
+
+                       inProgressPart = rollPartFile(now);
+               }
+
+               inProgressPart.write(element, now);
+       }
+
+       List<FileSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+               if (inProgressPart != null && 
(rollingPolicy.shouldRollOnCheckpoint(inProgressPart)
+                               || flush)) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug(
+                                               "Closing in-progress part file 
for bucket id={} on checkpoint.",
+                                               bucketId);
+                       }
+                       closePartFile();
+               }
+
+               List<FileSinkCommittable> committables = new ArrayList<>();
+               pendingFiles.forEach(pendingFile -> committables.add(new 
FileSinkCommittable(pendingFile)));
+               pendingFiles.clear();
+
+               if (inProgressFileToCleanup != null) {
+                       committables.add(new 
FileSinkCommittable(inProgressFileToCleanup));
+                       inProgressFileToCleanup = null;
+               }
+
+               return committables;
+       }
+
+       FileWriterBucketState<BucketID> snapshotState() throws IOException {
+               InProgressFileWriter.InProgressFileRecoverable 
inProgressFileRecoverable = null;
+               long inProgressFileCreationTime = Long.MAX_VALUE;
+
+               if (inProgressPart != null) {
+                       inProgressFileRecoverable = inProgressPart.persist();
+                       inProgressFileToCleanup = inProgressFileRecoverable;
+                       inProgressFileCreationTime = 
inProgressPart.getCreationTime();
+               }
+
+               return new FileWriterBucketState<>(
+                               bucketId,
+                               bucketPath,
+                               inProgressFileCreationTime,
+                               inProgressFileRecoverable);
+       }
+
+       private InProgressFileWriter<IN, BucketID> rollPartFile(long 
currentTime) throws IOException {
+               closePartFile();
+
+               final Path partFilePath = assembleNewPartPath();
+
+               if (LOG.isDebugEnabled()) {
+                       LOG.debug("Opening new part file \"{}\" for bucket 
id={}.",
+                                       partFilePath.getName(), bucketId);
+               }
+
+               return bucketWriter.openNewInProgressFile(bucketId, 
partFilePath, currentTime);
+       }
+
+       /**
+        * Constructor a new PartPath and increment the partCounter.
+        */
+       private Path assembleNewPartPath() {
+               long currentPartCounter = partCounter++;
+               return new Path(
+                               bucketPath,
+                               outputFileConfig.getPartPrefix() + '-' + 
uniqueId + '-' + currentPartCounter
+                                               + 
outputFileConfig.getPartSuffix());
+       }
+
+       private void closePartFile() throws IOException {
+               if (inProgressPart != null) {
+                       InProgressFileWriter.PendingFileRecoverable 
pendingFileRecoverable = inProgressPart.closeForCommit();
+                       pendingFiles.add(pendingFileRecoverable);
+                       inProgressPart = null;
+               }
+       }
+
+       void disposePartFile() {
+               if (inProgressPart != null) {
+                       inProgressPart.dispose();
+               }
+       }
+
+       // --------------------------- Testing Methods 
-----------------------------
+
+       @VisibleForTesting
+       public String getUniqueId() {
+               return uniqueId;
+       }
+
+       @Nullable
+       @VisibleForTesting
+       InProgressFileWriter<IN, BucketID> getInProgressPart() {
+               return inProgressPart;
+       }
+

Review comment:
       Added `@VisibleForTesting` for all the testing methods.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+               implements Writer<IN, FileSinkCommittable, 
FileWriterBucketState<BucketID>> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileWriter.class);
+
+       // ------------------------ configuration fields 
--------------------------
+
+       private final Path basePath;
+
+       private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+       private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+       private final BucketWriter<IN, BucketID> bucketWriter;
+
+       private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+       // --------------------------- runtime fields 
-----------------------------
+
+       private final BucketerContext bucketerContext;
+
+       private final Map<BucketID, FileWriterBucket<IN, BucketID>> 
activeBuckets;
+
+       private final OutputFileConfig outputFileConfig;
+
+       // --------------------------- State Related Fields 
-----------------------------
+
+       private final FileWriterBucketStateSerializer<BucketID> 
bucketStateSerializer;
+
+       /**
+        * A constructor creating a new empty bucket manager.
+        *
+        * @param basePath The base path for our buckets.
+        * @param bucketAssigner The {@link BucketAssigner} provided by the 
user.
+        * @param bucketFactory The {@link FileWriterBucketFactory} to be used 
to create buckets.
+        * @param bucketWriter The {@link BucketWriter} to be used when writing 
data.
+        * @param rollingPolicy The {@link RollingPolicy} as specified by the 
user.
+        */
+       public FileWriter(
+                       final Path basePath,
+                       final BucketAssigner<IN, BucketID> bucketAssigner,
+                       final FileWriterBucketFactory<IN, BucketID> 
bucketFactory,
+                       final BucketWriter<IN, BucketID> bucketWriter,
+                       final RollingPolicy<IN, BucketID> rollingPolicy,
+                       final OutputFileConfig outputFileConfig) {
+
+               this.basePath = checkNotNull(basePath);
+               this.bucketAssigner = checkNotNull(bucketAssigner);
+               this.bucketFactory = checkNotNull(bucketFactory);
+               this.bucketWriter = checkNotNull(bucketWriter);
+               this.rollingPolicy = checkNotNull(rollingPolicy);
+
+               this.outputFileConfig = checkNotNull(outputFileConfig);
+
+               this.activeBuckets = new HashMap<>();
+               this.bucketerContext = new BucketerContext();
+
+               this.bucketStateSerializer = new 
FileWriterBucketStateSerializer<>(
+                               
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+                               bucketAssigner.getSerializer());
+       }
+
+       /**
+        * Initializes the state after recovery from a failure.
+        *
+        * <p>During this process:
+        * <ol>
+        *     <li>we set the initial value for part counter to the maximum 
value used before across all tasks and buckets.
+        *     This guarantees that we do not overwrite valid data,</li>
+        *     <li>we commit any pending files for previous checkpoints 
(previous to the last successful one from which we restore),</li>
+        *     <li>we resume writing to the previous in-progress file of each 
bucket, and</li>
+        *     <li>if we receive multiple states for the same bucket, we merge 
them.</li>
+        * </ol>
+        *
+        * @param bucketStates the state holding recovered state about active 
buckets.
+        *
+        * @throws Exception if anything goes wrong during retrieving the state 
or restoring/committing of any
+        *              in-progress/pending part files
+        */
+       public void initializeState(List<FileWriterBucketState<BucketID>> 
bucketStates) throws IOException {
+               for (FileWriterBucketState<BucketID> state : bucketStates) {
+                       BucketID bucketId = state.getBucketId();
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Restoring: {}", state);
+                       }
+
+                       FileWriterBucket<IN, BucketID> restoredBucket = 
bucketFactory.restoreBucket(
+                                       bucketWriter,
+                                       rollingPolicy,
+                                       state,
+                                       outputFileConfig);
+
+                       updateActiveBucketId(bucketId, restoredBucket);
+               }
+       }
+
+       private void updateActiveBucketId(
+                       BucketID bucketId,
+                       FileWriterBucket<IN, BucketID> restoredBucket) throws 
IOException {
+               final FileWriterBucket<IN, BucketID> bucket = 
activeBuckets.get(bucketId);
+               if (bucket != null) {
+                       bucket.merge(restoredBucket);
+               } else {
+                       activeBuckets.put(bucketId, restoredBucket);
+               }
+       }
+
+       @Override
+       public void write(IN element, Context context) throws IOException {
+               // setting the values in the bucketer context
+               bucketerContext.update(
+                               context.timestamp(),
+                               context.currentWatermark());
+
+               final BucketID bucketId = bucketAssigner.getBucketId(element, 
bucketerContext);
+               final FileWriterBucket<IN, BucketID> bucket = 
getOrCreateBucketForBucketId(bucketId);
+               bucket.write(element);
+       }
+
+       @Override
+       public List<FileSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+               List<FileSinkCommittable> committables = new ArrayList<>();
+
+               // Every time before we prepare commit, we first check and 
remove the inactive
+               // buckets. Checking the activeness right before pre-committing 
avoid re-creating
+               // the bucket every time if the bucket use 
OnCheckpointingRollingPolicy.
+               Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> 
activeBucketIt =
+                               activeBuckets.entrySet().iterator();
+               while (activeBucketIt.hasNext()) {

Review comment:
       One concern for the bucket that only have pending files is that we check 
it after sending the commits, it would always be inactive, then we will need to 
remove it and add a new one in the next checkpoint period.Do you think this 
would be a problem ?   If we delay the checking for one checkpoint period, we 
would only remove the bucket if it does not receive records for one checkpoint 
period. 

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.committer;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Committer implementation for {@link FileSink}.
+ */
+public class FileCommitter implements Committer<FileSinkCommittable> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileCommitter.class);
+
+       private final BucketWriter<?, ?> bucketWriter;
+
+       public FileCommitter(BucketWriter<?, ?> bucketWriter) {
+               this.bucketWriter = checkNotNull(bucketWriter);
+       }
+
+       @Override
+       public List<FileSinkCommittable> commit(List<FileSinkCommittable> 
committables)  {
+               List<FileSinkCommittable> needRetry = new ArrayList<>();
+               for (FileSinkCommittable committable : committables) {
+                       if (committable.hasPendingFile()) {
+                               // We should always use commitAfterRecovery 
which contains additional checks.
+                               try {
+                                       
bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
+                               } catch (IOException e) {
+                                       LOG.error("Failed to commit {}", 
committable.getPendingFile());
+                                       needRetry.add(committable);

Review comment:
       Agree with that we should not use retry now, I changed to return the 
empty list for now.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/committer/FileCommitter.java
##########
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.committer;
+
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Committer implementation for {@link FileSink}.
+ */
+public class FileCommitter implements Committer<FileSinkCommittable> {
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileCommitter.class);
+
+       private final BucketWriter<?, ?> bucketWriter;
+
+       public FileCommitter(BucketWriter<?, ?> bucketWriter) {
+               this.bucketWriter = checkNotNull(bucketWriter);
+       }
+
+       @Override
+       public List<FileSinkCommittable> commit(List<FileSinkCommittable> 
committables)  {
+               List<FileSinkCommittable> needRetry = new ArrayList<>();
+               for (FileSinkCommittable committable : committables) {
+                       if (committable.hasPendingFile()) {
+                               // We should always use commitAfterRecovery 
which contains additional checks.
+                               try {
+                                       
bucketWriter.recoverPendingFile(committable.getPendingFile()).commitAfterRecovery();
+                               } catch (IOException e) {
+                                       LOG.error("Failed to commit {}", 
committable.getPendingFile());
+                                       needRetry.add(committable);

Review comment:
       Agree with that we should not use retry now, I changed to return the 
empty list for now.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import 
org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import 
org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink 
API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+               implements Sink<IN, FileSinkCommittable, 
FileWriterBucketState<BucketID>, Void> {
+
+       private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, 
BucketID, ?>> bucketsBuilder;
+
+       private FileSink(BucketsBuilder<IN, BucketID, ? extends 
BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+               this.bucketsBuilder = checkNotNull(bucketsBuilder);
+       }
+
+       @Override
+       public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> 
createWriter(
+                       InitContext context,
+                       List<FileWriterBucketState<BucketID>> states) throws 
IOException {
+               FileWriter<IN, BucketID> writer = 
bucketsBuilder.createWriter(context);
+               writer.initializeState(states);
+               return writer;
+       }
+
+       @Override
+       public 
Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> 
getWriterStateSerializer()
+                       throws IOException {
+               return Optional.of(bucketsBuilder.getWriterStateSerializer());
+       }
+
+       @Override
+       public Optional<Committer<FileSinkCommittable>> createCommitter() 
throws IOException {
+               return Optional.of(bucketsBuilder.createCommitter());
+       }
+
+       @Override
+       public Optional<SimpleVersionedSerializer<FileSinkCommittable>> 
getCommittableSerializer()
+                       throws IOException {
+               return Optional.of(bucketsBuilder.getCommittableSerializer());
+       }
+
+       @Override
+       public Optional<GlobalCommitter<FileSinkCommittable, Void>> 
createGlobalCommitter() {
+               return Optional.empty();
+       }
+
+       @Override
+       public Optional<SimpleVersionedSerializer<Void>> 
getGlobalCommittableSerializer() {
+               return Optional.empty();
+       }
+
+       public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+                       final Path basePath, final Encoder<IN> encoder) {
+               return new DefaultRowFormatBuilder<>(basePath, encoder, new 
DateTimeBucketAssigner<>());
+       }
+
+       /**
+        * The base abstract class for the {@link RowFormatBuilder} and {@link 
BulkFormatBuilder}.
+        */
+       @Internal
+       private abstract static class BucketsBuilder<IN, BucketID, T extends 
BucketsBuilder<IN, BucketID, T>>
+                       implements Serializable {
+
+               private static final long serialVersionUID = 1L;
+
+               public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 
1000L;
+
+               @SuppressWarnings("unchecked")
+               protected T self() {
+                       return (T) this;
+               }
+
+               @Internal
+               protected abstract FileWriter<IN, BucketID> createWriter(final 
InitContext context) throws IOException;
+
+               @Internal
+               protected abstract FileCommitter createCommitter() throws 
IOException;
+
+               @Internal
+               protected abstract 
SimpleVersionedSerializer<FileWriterBucketState<BucketID>> 
getWriterStateSerializer() throws IOException;
+
+               @Internal
+               protected abstract 
SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() 
throws IOException;
+       }
+
+       /**
+        * A builder for configuring the sink for row-wise encoding formats.
+        */
+       public static class RowFormatBuilder<IN, BucketID, T extends 
RowFormatBuilder<IN, BucketID, T>>
+                       extends BucketsBuilder<IN, BucketID, T> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final Path basePath;
+
+               private final Encoder<IN> encoder;
+
+               private BucketAssigner<IN, BucketID> bucketAssigner;
+
+               private RollingPolicy<IN, BucketID> rollingPolicy;
+
+               private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+               private OutputFileConfig outputFileConfig;
+
+               protected RowFormatBuilder(
+                               Path basePath,
+                               Encoder<IN> encoder,
+                               BucketAssigner<IN, BucketID> bucketAssigner) {
+                       this(
+                                       basePath,
+                                       encoder,
+                                       bucketAssigner,
+                                       DefaultRollingPolicy.builder().build(),
+                                       new DefaultFileWriterBucketFactory<>(),
+                                       OutputFileConfig.builder().build());
+               }
+
+               protected RowFormatBuilder(
+                               Path basePath,
+                               Encoder<IN> encoder,
+                               BucketAssigner<IN, BucketID> assigner,
+                               RollingPolicy<IN, BucketID> policy,
+                               FileWriterBucketFactory<IN, BucketID> 
bucketFactory,
+                               OutputFileConfig outputFileConfig) {
+                       this.basePath = checkNotNull(basePath);
+                       this.encoder = checkNotNull(encoder);
+                       this.bucketAssigner = checkNotNull(assigner);
+                       this.rollingPolicy = checkNotNull(policy);
+                       this.bucketFactory = checkNotNull(bucketFactory);
+                       this.outputFileConfig = checkNotNull(outputFileConfig);
+               }
+
+               public T withBucketAssigner(final BucketAssigner<IN, BucketID> 
assigner) {
+                       this.bucketAssigner = checkNotNull(assigner);
+                       return self();
+               }
+
+               public T withRollingPolicy(final RollingPolicy<IN, BucketID> 
policy) {
+                       this.rollingPolicy = checkNotNull(policy);
+                       return self();
+               }
+
+               public T withOutputFileConfig(final OutputFileConfig 
outputFileConfig) {
+                       this.outputFileConfig = outputFileConfig;
+                       return self();
+               }
+
+               public RowFormatBuilder<IN, BucketID, ? extends 
RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+                               BucketAssigner<IN, BucketID> assigner,
+                               RollingPolicy<IN, BucketID> policy) {
+                       Preconditions.checkState(
+                                       bucketFactory.getClass() == 
DefaultFileWriterBucketFactory.class,
+                                       
"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+                                                       + "after specifying a 
customized bucket factory");
+                       return new RowFormatBuilder<>(
+                                       basePath,
+                                       encoder,
+                                       checkNotNull(assigner),
+                                       checkNotNull(policy),
+                                       bucketFactory,
+                                       outputFileConfig);
+               }
+
+               @VisibleForTesting
+               T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> 
factory) {
+                       this.bucketFactory = 
Preconditions.checkNotNull(factory);
+                       return self();
+               }
+
+               /** Creates the actual sink. */
+               public FileSink<IN, BucketID> build() {
+                       return new FileSink<>(this);
+               }
+
+               @Override
+               public FileWriter<IN, BucketID> createWriter(InitContext 
context) throws IOException {

Review comment:
       It seems we could not change the modifier since the method is inherited 
from an interface~?

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/FileSink.java
##########
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.serialization.BulkWriter;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.connector.sink.Committer;
+import org.apache.flink.api.connector.sink.GlobalCommitter;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.committer.FileCommitter;
+import 
org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriter;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
+import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
+import 
org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.BulkBucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.DateTimeBucketAssigner;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.CheckpointRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Optional;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A unified sink for both streaming and blocking mode, based on the new Sink 
API.
+ */
+@Experimental
+public class FileSink<IN, BucketID>
+               implements Sink<IN, FileSinkCommittable, 
FileWriterBucketState<BucketID>, Void> {
+
+       private final BucketsBuilder<IN, BucketID, ? extends BucketsBuilder<IN, 
BucketID, ?>> bucketsBuilder;
+
+       private FileSink(BucketsBuilder<IN, BucketID, ? extends 
BucketsBuilder<IN, BucketID, ?>> bucketsBuilder) {
+               this.bucketsBuilder = checkNotNull(bucketsBuilder);
+       }
+
+       @Override
+       public Writer<IN, FileSinkCommittable, FileWriterBucketState<BucketID>> 
createWriter(
+                       InitContext context,
+                       List<FileWriterBucketState<BucketID>> states) throws 
IOException {
+               FileWriter<IN, BucketID> writer = 
bucketsBuilder.createWriter(context);
+               writer.initializeState(states);
+               return writer;
+       }
+
+       @Override
+       public 
Optional<SimpleVersionedSerializer<FileWriterBucketState<BucketID>>> 
getWriterStateSerializer()
+                       throws IOException {
+               return Optional.of(bucketsBuilder.getWriterStateSerializer());
+       }
+
+       @Override
+       public Optional<Committer<FileSinkCommittable>> createCommitter() 
throws IOException {
+               return Optional.of(bucketsBuilder.createCommitter());
+       }
+
+       @Override
+       public Optional<SimpleVersionedSerializer<FileSinkCommittable>> 
getCommittableSerializer()
+                       throws IOException {
+               return Optional.of(bucketsBuilder.getCommittableSerializer());
+       }
+
+       @Override
+       public Optional<GlobalCommitter<FileSinkCommittable, Void>> 
createGlobalCommitter() {
+               return Optional.empty();
+       }
+
+       @Override
+       public Optional<SimpleVersionedSerializer<Void>> 
getGlobalCommittableSerializer() {
+               return Optional.empty();
+       }
+
+       public static <IN> DefaultRowFormatBuilder<IN> forRowFormat(
+                       final Path basePath, final Encoder<IN> encoder) {
+               return new DefaultRowFormatBuilder<>(basePath, encoder, new 
DateTimeBucketAssigner<>());
+       }
+
+       /**
+        * The base abstract class for the {@link RowFormatBuilder} and {@link 
BulkFormatBuilder}.
+        */
+       @Internal
+       private abstract static class BucketsBuilder<IN, BucketID, T extends 
BucketsBuilder<IN, BucketID, T>>
+                       implements Serializable {
+
+               private static final long serialVersionUID = 1L;
+
+               public static final long DEFAULT_BUCKET_CHECK_INTERVAL = 60L * 
1000L;
+
+               @SuppressWarnings("unchecked")
+               protected T self() {
+                       return (T) this;
+               }
+
+               @Internal
+               protected abstract FileWriter<IN, BucketID> createWriter(final 
InitContext context) throws IOException;
+
+               @Internal
+               protected abstract FileCommitter createCommitter() throws 
IOException;
+
+               @Internal
+               protected abstract 
SimpleVersionedSerializer<FileWriterBucketState<BucketID>> 
getWriterStateSerializer() throws IOException;
+
+               @Internal
+               protected abstract 
SimpleVersionedSerializer<FileSinkCommittable> getCommittableSerializer() 
throws IOException;
+       }
+
+       /**
+        * A builder for configuring the sink for row-wise encoding formats.
+        */
+       public static class RowFormatBuilder<IN, BucketID, T extends 
RowFormatBuilder<IN, BucketID, T>>
+                       extends BucketsBuilder<IN, BucketID, T> {
+
+               private static final long serialVersionUID = 1L;
+
+               private final Path basePath;
+
+               private final Encoder<IN> encoder;
+
+               private BucketAssigner<IN, BucketID> bucketAssigner;
+
+               private RollingPolicy<IN, BucketID> rollingPolicy;
+
+               private FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+               private OutputFileConfig outputFileConfig;
+
+               protected RowFormatBuilder(
+                               Path basePath,
+                               Encoder<IN> encoder,
+                               BucketAssigner<IN, BucketID> bucketAssigner) {
+                       this(
+                                       basePath,
+                                       encoder,
+                                       bucketAssigner,
+                                       DefaultRollingPolicy.builder().build(),
+                                       new DefaultFileWriterBucketFactory<>(),
+                                       OutputFileConfig.builder().build());
+               }
+
+               protected RowFormatBuilder(
+                               Path basePath,
+                               Encoder<IN> encoder,
+                               BucketAssigner<IN, BucketID> assigner,
+                               RollingPolicy<IN, BucketID> policy,
+                               FileWriterBucketFactory<IN, BucketID> 
bucketFactory,
+                               OutputFileConfig outputFileConfig) {
+                       this.basePath = checkNotNull(basePath);
+                       this.encoder = checkNotNull(encoder);
+                       this.bucketAssigner = checkNotNull(assigner);
+                       this.rollingPolicy = checkNotNull(policy);
+                       this.bucketFactory = checkNotNull(bucketFactory);
+                       this.outputFileConfig = checkNotNull(outputFileConfig);
+               }
+
+               public T withBucketAssigner(final BucketAssigner<IN, BucketID> 
assigner) {
+                       this.bucketAssigner = checkNotNull(assigner);
+                       return self();
+               }
+
+               public T withRollingPolicy(final RollingPolicy<IN, BucketID> 
policy) {
+                       this.rollingPolicy = checkNotNull(policy);
+                       return self();
+               }
+
+               public T withOutputFileConfig(final OutputFileConfig 
outputFileConfig) {
+                       this.outputFileConfig = outputFileConfig;
+                       return self();
+               }
+
+               public RowFormatBuilder<IN, BucketID, ? extends 
RowFormatBuilder<IN, BucketID, ?>> withNewBucketAssignerAndPolicy(
+                               BucketAssigner<IN, BucketID> assigner,
+                               RollingPolicy<IN, BucketID> policy) {
+                       Preconditions.checkState(
+                                       bucketFactory.getClass() == 
DefaultFileWriterBucketFactory.class,
+                                       
"newBuilderWithBucketAssignerAndPolicy() cannot be called "
+                                                       + "after specifying a 
customized bucket factory");
+                       return new RowFormatBuilder<>(
+                                       basePath,
+                                       encoder,
+                                       checkNotNull(assigner),
+                                       checkNotNull(policy),
+                                       bucketFactory,
+                                       outputFileConfig);
+               }
+
+               @VisibleForTesting
+               T withBucketFactory(final FileWriterBucketFactory<IN, BucketID> 
factory) {
+                       this.bucketFactory = 
Preconditions.checkNotNull(factory);
+                       return self();
+               }
+
+               /** Creates the actual sink. */
+               public FileSink<IN, BucketID> build() {
+                       return new FileSink<>(this);
+               }
+
+               @Override
+               public FileWriter<IN, BucketID> createWriter(InitContext 
context) throws IOException {

Review comment:
       Yes, indeed, I changed the modifier to `package-private`.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+               implements Writer<IN, FileSinkCommittable, 
FileWriterBucketState<BucketID>> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileWriter.class);
+
+       // ------------------------ configuration fields 
--------------------------
+
+       private final Path basePath;
+
+       private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+       private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+       private final BucketWriter<IN, BucketID> bucketWriter;
+
+       private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+       // --------------------------- runtime fields 
-----------------------------
+
+       private final BucketerContext bucketerContext;
+
+       private final Map<BucketID, FileWriterBucket<IN, BucketID>> 
activeBuckets;
+
+       private final OutputFileConfig outputFileConfig;
+
+       // --------------------------- State Related Fields 
-----------------------------
+
+       private final FileWriterBucketStateSerializer<BucketID> 
bucketStateSerializer;
+
+       /**
+        * A constructor creating a new empty bucket manager.
+        *
+        * @param basePath The base path for our buckets.
+        * @param bucketAssigner The {@link BucketAssigner} provided by the 
user.
+        * @param bucketFactory The {@link FileWriterBucketFactory} to be used 
to create buckets.
+        * @param bucketWriter The {@link BucketWriter} to be used when writing 
data.
+        * @param rollingPolicy The {@link RollingPolicy} as specified by the 
user.
+        */
+       public FileWriter(
+                       final Path basePath,
+                       final BucketAssigner<IN, BucketID> bucketAssigner,
+                       final FileWriterBucketFactory<IN, BucketID> 
bucketFactory,
+                       final BucketWriter<IN, BucketID> bucketWriter,
+                       final RollingPolicy<IN, BucketID> rollingPolicy,
+                       final OutputFileConfig outputFileConfig) {
+
+               this.basePath = checkNotNull(basePath);
+               this.bucketAssigner = checkNotNull(bucketAssigner);
+               this.bucketFactory = checkNotNull(bucketFactory);
+               this.bucketWriter = checkNotNull(bucketWriter);
+               this.rollingPolicy = checkNotNull(rollingPolicy);
+
+               this.outputFileConfig = checkNotNull(outputFileConfig);
+
+               this.activeBuckets = new HashMap<>();
+               this.bucketerContext = new BucketerContext();
+
+               this.bucketStateSerializer = new 
FileWriterBucketStateSerializer<>(
+                               
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+                               bucketAssigner.getSerializer());
+       }
+
+       /**
+        * Initializes the state after recovery from a failure.
+        *
+        * <p>During this process:
+        * <ol>
+        *     <li>we set the initial value for part counter to the maximum 
value used before across all tasks and buckets.
+        *     This guarantees that we do not overwrite valid data,</li>
+        *     <li>we commit any pending files for previous checkpoints 
(previous to the last successful one from which we restore),</li>
+        *     <li>we resume writing to the previous in-progress file of each 
bucket, and</li>
+        *     <li>if we receive multiple states for the same bucket, we merge 
them.</li>
+        * </ol>
+        *
+        * @param bucketStates the state holding recovered state about active 
buckets.
+        *
+        * @throws Exception if anything goes wrong during retrieving the state 
or restoring/committing of any
+        *              in-progress/pending part files
+        */
+       public void initializeState(List<FileWriterBucketState<BucketID>> 
bucketStates) throws IOException {
+               for (FileWriterBucketState<BucketID> state : bucketStates) {
+                       BucketID bucketId = state.getBucketId();
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Restoring: {}", state);
+                       }
+
+                       FileWriterBucket<IN, BucketID> restoredBucket = 
bucketFactory.restoreBucket(
+                                       bucketWriter,
+                                       rollingPolicy,
+                                       state,
+                                       outputFileConfig);
+
+                       updateActiveBucketId(bucketId, restoredBucket);
+               }
+       }
+
+       private void updateActiveBucketId(
+                       BucketID bucketId,
+                       FileWriterBucket<IN, BucketID> restoredBucket) throws 
IOException {
+               final FileWriterBucket<IN, BucketID> bucket = 
activeBuckets.get(bucketId);
+               if (bucket != null) {
+                       bucket.merge(restoredBucket);
+               } else {
+                       activeBuckets.put(bucketId, restoredBucket);
+               }
+       }
+
+       @Override
+       public void write(IN element, Context context) throws IOException {
+               // setting the values in the bucketer context
+               bucketerContext.update(
+                               context.timestamp(),
+                               context.currentWatermark());
+
+               final BucketID bucketId = bucketAssigner.getBucketId(element, 
bucketerContext);
+               final FileWriterBucket<IN, BucketID> bucket = 
getOrCreateBucketForBucketId(bucketId);
+               bucket.write(element);
+       }
+
+       @Override
+       public List<FileSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+               List<FileSinkCommittable> committables = new ArrayList<>();
+
+               // Every time before we prepare commit, we first check and 
remove the inactive
+               // buckets. Checking the activeness right before pre-committing 
avoid re-creating
+               // the bucket every time if the bucket use 
OnCheckpointingRollingPolicy.
+               Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> 
activeBucketIt =
+                               activeBuckets.entrySet().iterator();
+               while (activeBucketIt.hasNext()) {

Review comment:
       No, not this issue, I referred to the issue that we might need to 
recreate the bucket on each checkpoint period if `OnCheckpointRollingPolicy` is 
used and we detecting the inactiveness after send the committables. In this 
case the bucket will always be detected to be inactive since all committables 
are sent out right before the detection.

##########
File path: 
flink-connectors/flink-connector-files/src/main/java/org/apache/flink/connector/file/sink/writer/FileWriter.java
##########
@@ -0,0 +1,278 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.file.sink.writer;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.connector.sink.Writer;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.sink.FileSinkCommittable;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
+import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
+import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
+/**
+ * Writer implementation for {@link FileSink}.
+ */
+public class FileWriter<IN, BucketID>
+               implements Writer<IN, FileSinkCommittable, 
FileWriterBucketState<BucketID>> {
+
+       private static final Logger LOG = 
LoggerFactory.getLogger(FileWriter.class);
+
+       // ------------------------ configuration fields 
--------------------------
+
+       private final Path basePath;
+
+       private final FileWriterBucketFactory<IN, BucketID> bucketFactory;
+
+       private final BucketAssigner<IN, BucketID> bucketAssigner;
+
+       private final BucketWriter<IN, BucketID> bucketWriter;
+
+       private final RollingPolicy<IN, BucketID> rollingPolicy;
+
+       // --------------------------- runtime fields 
-----------------------------
+
+       private final BucketerContext bucketerContext;
+
+       private final Map<BucketID, FileWriterBucket<IN, BucketID>> 
activeBuckets;
+
+       private final OutputFileConfig outputFileConfig;
+
+       // --------------------------- State Related Fields 
-----------------------------
+
+       private final FileWriterBucketStateSerializer<BucketID> 
bucketStateSerializer;
+
+       /**
+        * A constructor creating a new empty bucket manager.
+        *
+        * @param basePath The base path for our buckets.
+        * @param bucketAssigner The {@link BucketAssigner} provided by the 
user.
+        * @param bucketFactory The {@link FileWriterBucketFactory} to be used 
to create buckets.
+        * @param bucketWriter The {@link BucketWriter} to be used when writing 
data.
+        * @param rollingPolicy The {@link RollingPolicy} as specified by the 
user.
+        */
+       public FileWriter(
+                       final Path basePath,
+                       final BucketAssigner<IN, BucketID> bucketAssigner,
+                       final FileWriterBucketFactory<IN, BucketID> 
bucketFactory,
+                       final BucketWriter<IN, BucketID> bucketWriter,
+                       final RollingPolicy<IN, BucketID> rollingPolicy,
+                       final OutputFileConfig outputFileConfig) {
+
+               this.basePath = checkNotNull(basePath);
+               this.bucketAssigner = checkNotNull(bucketAssigner);
+               this.bucketFactory = checkNotNull(bucketFactory);
+               this.bucketWriter = checkNotNull(bucketWriter);
+               this.rollingPolicy = checkNotNull(rollingPolicy);
+
+               this.outputFileConfig = checkNotNull(outputFileConfig);
+
+               this.activeBuckets = new HashMap<>();
+               this.bucketerContext = new BucketerContext();
+
+               this.bucketStateSerializer = new 
FileWriterBucketStateSerializer<>(
+                               
bucketWriter.getProperties().getInProgressFileRecoverableSerializer(),
+                               bucketAssigner.getSerializer());
+       }
+
+       /**
+        * Initializes the state after recovery from a failure.
+        *
+        * <p>During this process:
+        * <ol>
+        *     <li>we set the initial value for part counter to the maximum 
value used before across all tasks and buckets.
+        *     This guarantees that we do not overwrite valid data,</li>
+        *     <li>we commit any pending files for previous checkpoints 
(previous to the last successful one from which we restore),</li>
+        *     <li>we resume writing to the previous in-progress file of each 
bucket, and</li>
+        *     <li>if we receive multiple states for the same bucket, we merge 
them.</li>
+        * </ol>
+        *
+        * @param bucketStates the state holding recovered state about active 
buckets.
+        *
+        * @throws Exception if anything goes wrong during retrieving the state 
or restoring/committing of any
+        *              in-progress/pending part files
+        */
+       public void initializeState(List<FileWriterBucketState<BucketID>> 
bucketStates) throws IOException {
+               for (FileWriterBucketState<BucketID> state : bucketStates) {
+                       BucketID bucketId = state.getBucketId();
+
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Restoring: {}", state);
+                       }
+
+                       FileWriterBucket<IN, BucketID> restoredBucket = 
bucketFactory.restoreBucket(
+                                       bucketWriter,
+                                       rollingPolicy,
+                                       state,
+                                       outputFileConfig);
+
+                       updateActiveBucketId(bucketId, restoredBucket);
+               }
+       }
+
+       private void updateActiveBucketId(
+                       BucketID bucketId,
+                       FileWriterBucket<IN, BucketID> restoredBucket) throws 
IOException {
+               final FileWriterBucket<IN, BucketID> bucket = 
activeBuckets.get(bucketId);
+               if (bucket != null) {
+                       bucket.merge(restoredBucket);
+               } else {
+                       activeBuckets.put(bucketId, restoredBucket);
+               }
+       }
+
+       @Override
+       public void write(IN element, Context context) throws IOException {
+               // setting the values in the bucketer context
+               bucketerContext.update(
+                               context.timestamp(),
+                               context.currentWatermark());
+
+               final BucketID bucketId = bucketAssigner.getBucketId(element, 
bucketerContext);
+               final FileWriterBucket<IN, BucketID> bucket = 
getOrCreateBucketForBucketId(bucketId);
+               bucket.write(element);
+       }
+
+       @Override
+       public List<FileSinkCommittable> prepareCommit(boolean flush) throws 
IOException {
+               List<FileSinkCommittable> committables = new ArrayList<>();
+
+               // Every time before we prepare commit, we first check and 
remove the inactive
+               // buckets. Checking the activeness right before pre-committing 
avoid re-creating
+               // the bucket every time if the bucket use 
OnCheckpointingRollingPolicy.
+               Iterator<Map.Entry<BucketID, FileWriterBucket<IN, BucketID>>> 
activeBucketIt =
+                               activeBuckets.entrySet().iterator();
+               while (activeBucketIt.hasNext()) {

Review comment:
       No, not this issue, I mean the issue that we might need to recreate the 
bucket on each checkpoint period if `OnCheckpointRollingPolicy` is used and we 
detecting the inactiveness after send the committables. In this case the bucket 
will always be detected to be inactive since all committables are sent out 
right before the detection.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to